Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 18:29:15 +08:00
commit be476a3fea
76 changed files with 12812 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
# API Design Examples Index
**All example files are under 500 lines for optimal loading.**
## Available Examples
- **[fastapi-crud.md](fastapi-crud.md)** - Complete CRUD endpoints with repository pattern (332 lines)
- **[pydantic-schemas.md](pydantic-schemas.md)** - Request/response schemas with validation
- **[tanstack-start.md](tanstack-start.md)** - TanStack Start server functions
- **[pagination.md](pagination.md)** - Offset and cursor-based pagination
- **[testing.md](testing.md)** - API endpoint testing with pytest
## Quick Links
- **Templates**: See [../templates/](../templates/) for copy-paste ready code
- **Reference**: See [../reference/](../reference/) for complete configurations
- **Checklists**: See [../checklists/](../checklists/) for API design validation

View File

@@ -0,0 +1,332 @@
# FastAPI CRUD Endpoints
**Complete CRUD endpoint examples with repository pattern and tenant isolation.**
## Complete User CRUD
```python
# app/api/routes/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session
from typing import Optional
from app.core.dependencies import get_current_user, get_db
from app.models.user import User
from app.repositories.user_repository import UserRepository
from app.schemas.user import UserCreate, UserRead, UserUpdate, PaginatedResponse
router = APIRouter(prefix="/api/v1/users", tags=["users"])
@router.get("", response_model=PaginatedResponse[UserRead], status_code=status.HTTP_200_OK)
async def list_users(
skip: int = 0,
limit: int = 100,
is_active: Optional[bool] = None,
email_contains: Optional[str] = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> PaginatedResponse[UserRead]:
"""
List all users in the current tenant with pagination and filtering.
- **skip**: Number of records to skip (pagination offset)
- **limit**: Maximum number of records to return (max 100)
- **is_active**: Filter by active status (optional)
- **email_contains**: Filter by email substring (optional)
- **Returns**: Paginated list of users with public fields only
"""
repository = UserRepository(db, tenant_id=current_user.tenant_id)
# Build filter criteria
filters = {}
if is_active is not None:
filters["is_active"] = is_active
if email_contains:
filters["email_contains"] = email_contains
users = await repository.list(filters=filters, skip=skip, limit=limit)
total = await repository.count(filters=filters)
return PaginatedResponse(
items=users,
total=total,
skip=skip,
limit=limit,
has_more=(skip + limit) < total,
)
@router.get("/{user_id}", response_model=UserRead, status_code=status.HTTP_200_OK)
async def get_user(
user_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> UserRead:
"""Get a single user by ID (tenant-isolated)."""
repository = UserRepository(db, tenant_id=current_user.tenant_id)
user = await repository.get_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found",
)
return user
@router.post("", response_model=UserRead, status_code=status.HTTP_201_CREATED)
async def create_user(
user_data: UserCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> UserRead:
"""
Create a new user in the current tenant.
- **email**: Valid email address (unique per tenant)
- **full_name**: User's full name (1-255 characters)
- **password**: At least 8 characters
- **Returns**: Created user with ID, timestamps, and public fields
**Errors**:
- 409 Conflict: Email already exists in tenant
- 422 Validation Error: Invalid email or weak password
"""
repository = UserRepository(db, tenant_id=current_user.tenant_id)
try:
user = await repository.create(user_data)
return user
except IntegrityError:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"User with email {user_data.email} already exists",
)
@router.put("/{user_id}", response_model=UserRead, status_code=status.HTTP_200_OK)
async def update_user(
user_id: str,
user_data: UserUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> UserRead:
"""
Update an existing user (tenant-isolated).
All fields are optional - only provided fields will be updated.
"""
repository = UserRepository(db, tenant_id=current_user.tenant_id)
user = await repository.update(user_id, user_data)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found",
)
return user
@router.patch("/{user_id}", response_model=UserRead, status_code=status.HTTP_200_OK)
async def partial_update_user(
user_id: str,
user_data: UserUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> UserRead:
"""
Partially update an existing user (tenant-isolated).
Same as PUT but semantically indicates partial update.
"""
repository = UserRepository(db, tenant_id=current_user.tenant_id)
user = await repository.update(user_id, user_data)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found",
)
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> None:
"""
Soft-delete a user (tenant-isolated).
Returns 204 No Content on success (no response body).
"""
repository = UserRepository(db, tenant_id=current_user.tenant_id)
success = await repository.delete(user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found",
)
```
## Repository Pattern with Tenant Isolation
```python
# app/repositories/user_repository.py
from sqlmodel import Session, select, func
from typing import Optional
from datetime import datetime
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class UserRepository:
"""Repository with automatic tenant filtering."""
def __init__(self, db: Session, tenant_id: str):
self.db = db
self.tenant_id = tenant_id
async def list(
self, filters: dict = None, skip: int = 0, limit: int = 100
) -> list[User]:
"""List users (automatically filtered by tenant_id)."""
statement = select(User).where(User.tenant_id == self.tenant_id)
# Apply additional filters
if filters:
if "is_active" in filters:
statement = statement.where(User.is_active == filters["is_active"])
if "email_contains" in filters:
statement = statement.where(User.email.contains(filters["email_contains"]))
statement = statement.offset(skip).limit(limit).order_by(User.created_at.desc())
result = await self.db.execute(statement)
return result.scalars().all()
async def count(self, filters: dict = None) -> int:
"""Count users (tenant-isolated)."""
statement = select(func.count(User.id)).where(User.tenant_id == self.tenant_id)
# Apply same filters as list()
if filters:
if "is_active" in filters:
statement = statement.where(User.is_active == filters["is_active"])
if "email_contains" in filters:
statement = statement.where(User.email.contains(filters["email_contains"]))
result = await self.db.execute(statement)
return result.scalar_one()
async def get_by_id(self, user_id: str) -> User | None:
"""Get user by ID (tenant-isolated)."""
statement = select(User).where(
User.id == user_id,
User.tenant_id == self.tenant_id,
)
result = await self.db.execute(statement)
return result.scalar_one_or_none()
async def create(self, user_data: UserCreate) -> User:
"""Create a new user in the current tenant."""
from app.utils.password import hash_password
user = User(
email=user_data.email,
full_name=user_data.full_name,
hashed_password=hash_password(user_data.password),
tenant_id=self.tenant_id, # Automatic tenant assignment
)
self.db.add(user)
await self.db.commit()
await self.db.refresh(user)
return user
async def update(self, user_id: str, user_data: UserUpdate) -> User | None:
"""Update an existing user (tenant-isolated)."""
user = await self.get_by_id(user_id)
if not user:
return None
# Update only provided fields
update_data = user_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
await self.db.commit()
await self.db.refresh(user)
return user
async def delete(self, user_id: str) -> bool:
"""Soft-delete a user (tenant-isolated)."""
user = await self.get_by_id(user_id)
if not user:
return False
# Soft delete by setting deleted_at timestamp
user.deleted_at = datetime.utcnow()
await self.db.commit()
return True
```
## Nested Resources
```python
# app/api/routes/organizations.py
from fastapi import APIRouter, Depends, HTTPException, status
router = APIRouter(prefix="/api/v1/organizations", tags=["organizations"])
@router.get("/{org_id}/teams", response_model=list[TeamRead])
async def list_organization_teams(
org_id: str,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> list[TeamRead]:
"""List all teams in an organization (tenant-isolated)."""
# Verify organization exists and belongs to tenant
org_repo = OrganizationRepository(db, tenant_id=current_user.tenant_id)
org = await org_repo.get_by_id(org_id)
if not org:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Organization with ID {org_id} not found",
)
# Fetch teams for organization
team_repo = TeamRepository(db, tenant_id=current_user.tenant_id)
teams = await team_repo.list_by_organization(org_id, skip=skip, limit=limit)
return teams
@router.post("/{org_id}/teams", response_model=TeamRead, status_code=status.HTTP_201_CREATED)
async def create_organization_team(
org_id: str,
team_data: TeamCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> TeamRead:
"""Create a new team in an organization."""
# Verify organization exists
org_repo = OrganizationRepository(db, tenant_id=current_user.tenant_id)
org = await org_repo.get_by_id(org_id)
if not org:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Organization with ID {org_id} not found",
)
# Create team
team_repo = TeamRepository(db, tenant_id=current_user.tenant_id)
team = await team_repo.create(team_data, organization_id=org_id)
return team
```
**See also:**
- [pydantic-schemas.md](pydantic-schemas.md) - Request/response schema patterns
- [pagination.md](pagination.md) - Pagination and filtering examples
- [../templates/fastapi-crud-endpoint.py](../templates/fastapi-crud-endpoint.py) - CRUD template

View File

@@ -0,0 +1,38 @@
# Pagination Patterns
**Offset-based and cursor-based pagination examples.**
## Offset-Based Pagination
```python
class PaginatedResponse[T](BaseModel):
items: list[T]
total: int
skip: int
limit: int
has_more: bool
@router.get("", response_model=PaginatedResponse[UserRead])
async def list_users(skip: int = 0, limit: int = 100):
repository = UserRepository(db, tenant_id=current_user.tenant_id)
users = await repository.list(skip=skip, limit=limit)
total = await repository.count()
return PaginatedResponse(items=users, total=total, skip=skip, limit=limit, has_more=(skip + limit) < total)
```
## Cursor-Based Pagination (Recommended)
```python
class CursorPaginatedResponse[T](BaseModel):
items: list[T]
next_cursor: Optional[str]
has_more: bool
@router.get("")
async def list_users(cursor: Optional[str] = None, limit: int = 100):
users = await repository.list_cursor(cursor=cursor, limit=limit)
next_cursor = users[-1].id if len(users) == limit else None
return CursorPaginatedResponse(items=users, next_cursor=next_cursor, has_more=next_cursor is not None)
```
**See also:** [fastapi-crud.md](fastapi-crud.md) for complete examples

View File

@@ -0,0 +1,149 @@
# Pydantic Schema Examples
**Complete Pydantic schema patterns for request/response validation.**
## Request/Response Schemas
```python
# app/schemas/user.py
from pydantic import BaseModel, EmailStr, Field, ConfigDict, field_validator, model_validator
from datetime import datetime
from typing import Optional
class UserBase(BaseModel):
"""Shared fields for User schemas."""
email: EmailStr
full_name: str = Field(..., min_length=1, max_length=255)
is_active: bool = True
class UserCreate(UserBase):
"""Schema for creating a new user."""
password: str = Field(..., min_length=8, max_length=100)
password_confirm: str = Field(..., min_length=8, max_length=100)
@field_validator("password")
@classmethod
def validate_password_strength(cls, v: str) -> str:
"""Ensure password meets complexity requirements."""
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
if not any(char.isdigit() for char in v):
raise ValueError("Password must contain at least one digit")
if not any(char.isupper() for char in v):
raise ValueError("Password must contain at least one uppercase letter")
if not any(char.islower() for char in v):
raise ValueError("Password must contain at least one lowercase letter")
return v
@model_validator(mode="after")
def passwords_match(self) -> "UserCreate":
"""Ensure password and password_confirm match."""
if self.password != self.password_confirm:
raise ValueError("Passwords do not match")
return self
class UserUpdate(BaseModel):
"""Schema for updating an existing user (all fields optional)."""
email: Optional[EmailStr] = None
full_name: Optional[str] = Field(None, min_length=1, max_length=255)
is_active: Optional[bool] = None
class UserRead(UserBase):
"""Schema for reading user data (public fields only)."""
id: str
tenant_id: str
created_at: datetime
updated_at: datetime
model_config = ConfigDict(from_attributes=True)
class PaginatedResponse[T](BaseModel):
"""Generic paginated response."""
items: list[T]
total: int
skip: int
limit: int
has_more: bool
```
## Nested Schemas
```python
# app/schemas/organization.py
from pydantic import BaseModel, ConfigDict, Field
from datetime import datetime
from typing import Optional
from app.schemas.team import TeamRead
class OrganizationBase(BaseModel):
"""Shared fields for Organization schemas."""
name: str = Field(..., min_length=1, max_length=255)
description: Optional[str] = None
class OrganizationCreate(OrganizationBase):
"""Schema for creating a new organization."""
pass
class OrganizationRead(OrganizationBase):
"""Organization with nested teams."""
id: str
tenant_id: str
teams: list[TeamRead] = [] # Nested teams
created_at: datetime
updated_at: datetime
model_config = ConfigDict(from_attributes=True)
```
## Custom Validation
```python
# app/schemas/user.py
from pydantic import field_validator, model_validator
import re
class UserCreate(BaseModel):
email: EmailStr
username: str
password: str
@field_validator("username")
@classmethod
def validate_username(cls, v: str) -> str:
"""Ensure username is alphanumeric."""
if not re.match(r"^[a-zA-Z0-9_-]+$", v):
raise ValueError("Username must contain only letters, numbers, hyphens, and underscores")
if len(v) < 3:
raise ValueError("Username must be at least 3 characters")
return v
@field_validator("email")
@classmethod
def validate_email_domain(cls, v: str) -> str:
"""Ensure email is from allowed domain."""
allowed_domains = ["example.com", "greyhaven.studio"]
domain = v.split("@")[1]
if domain not in allowed_domains:
raise ValueError(f"Email must be from {', '.join(allowed_domains)}")
return v
@model_validator(mode="after")
def validate_username_not_in_email(self) -> "UserCreate":
"""Ensure username is not part of email."""
if self.username.lower() in self.email.lower():
raise ValueError("Username cannot be part of email address")
return self
```
**See also:**
- [fastapi-crud.md](fastapi-crud.md) - CRUD endpoints using these schemas
- [../templates/pydantic-schemas.py](../templates/pydantic-schemas.py) - Schema template

View File

@@ -0,0 +1,71 @@
# TanStack Start Server Functions
**Complete server function examples with Drizzle ORM and tenant isolation.**
See [../templates/tanstack-server-function.ts](../templates/tanstack-server-function.ts) for full template.
## Complete CRUD Server Functions
```typescript
// app/routes/api/users.ts
import { createServerFn } from "@tanstack/start";
import { z } from "zod";
import { db } from "~/utils/db.server";
import { usersTable } from "~/db/schema";
import { getAuthUser } from "~/utils/auth.server";
import { eq, and, like, count, desc } from "drizzle-orm";
import { hashPassword } from "~/utils/password.server";
// Validation schemas
const createUserSchema = z.object({
email: z.string().email(),
fullName: z.string().min(1).max(255),
password: z.string().min(8),
});
const updateUserSchema = z.object({
email: z.string().email().optional(),
fullName: z.string().min(1).max(255).optional(),
isActive: z.boolean().optional(),
});
// List users
export const listUsers = createServerFn({ method: "GET" })
.validator(z.object({ skip: z.number().min(0).default(0), limit: z.number().min(1).max(100).default(100) }))
.handler(async ({ data, context }) => {
const authUser = await getAuthUser(context);
if (!authUser) throw new Error("Unauthorized", { status: 401 });
const users = await db.select().from(usersTable)
.where(eq(usersTable.tenantId, authUser.tenantId))
.orderBy(desc(usersTable.createdAt))
.limit(data.limit).offset(data.skip);
const [{ count: total }] = await db.select({ count: count() })
.from(usersTable).where(eq(usersTable.tenantId, authUser.tenantId));
return {
items: users.map(({ hashedPassword, ...user }) => user),
total, skip: data.skip, limit: data.limit,
hasMore: data.skip + data.limit < total,
};
});
// Create user
export const createUser = createServerFn({ method: "POST" })
.validator(createUserSchema)
.handler(async ({ data, context }) => {
const authUser = await getAuthUser(context);
if (!authUser) throw new Error("Unauthorized", { status: 401 });
const [user] = await db.insert(usersTable).values({
...data, hashedPassword: await hashPassword(data.password),
tenantId: authUser.tenantId,
}).returning();
const { hashedPassword: _, ...userPublic } = user;
return userPublic;
});
```
**See also:** [fastapi-crud.md](fastapi-crud.md) for FastAPI examples

View File

@@ -0,0 +1,22 @@
# API Testing Examples
**pytest examples for FastAPI endpoints.**
```python
# tests/api/test_users.py
import pytest
from fastapi.testclient import TestClient
@pytest.mark.integration
def test_create_user(test_token: str):
response = client.post("/api/v1/users", json={"email": "test@example.com", "full_name": "Test", "password": "Pass123"}, headers={"Authorization": f"Bearer {test_token}"})
assert response.status_code == 201
assert "id" in response.json()
@pytest.mark.integration
def test_tenant_isolation(test_token: str):
response = client.get(f"/api/v1/users/{other_tenant_user_id}", headers={"Authorization": f"Bearer {test_token}"})
assert response.status_code == 404 # Cannot access other tenant's data
```
Run with: `doppler run --config test -- pytest tests/api/ -v`