Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 17:51:28 +08:00
commit ad3f579904
14 changed files with 3695 additions and 0 deletions

View File

@@ -0,0 +1,527 @@
---
name: api-design-principles
description: Master REST and GraphQL API design principles to build intuitive, scalable, and maintainable APIs that delight developers. Use when designing new APIs, reviewing API specifications, or establishing API design standards.
---
# API Design Principles
Master REST and GraphQL API design principles to build intuitive, scalable, and maintainable APIs that delight developers and stand the test of time.
## When to Use This Skill
- Designing new REST or GraphQL APIs
- Refactoring existing APIs for better usability
- Establishing API design standards for your team
- Reviewing API specifications before implementation
- Migrating between API paradigms (REST to GraphQL, etc.)
- Creating developer-friendly API documentation
- Optimizing APIs for specific use cases (mobile, third-party integrations)
## Core Concepts
### 1. RESTful Design Principles
**Resource-Oriented Architecture**
- Resources are nouns (users, orders, products), not verbs
- Use HTTP methods for actions (GET, POST, PUT, PATCH, DELETE)
- URLs represent resource hierarchies
- Consistent naming conventions
**HTTP Methods Semantics:**
- `GET`: Retrieve resources (idempotent, safe)
- `POST`: Create new resources
- `PUT`: Replace entire resource (idempotent)
- `PATCH`: Partial resource updates
- `DELETE`: Remove resources (idempotent)
### 2. GraphQL Design Principles
**Schema-First Development**
- Types define your domain model
- Queries for reading data
- Mutations for modifying data
- Subscriptions for real-time updates
**Query Structure:**
- Clients request exactly what they need
- Single endpoint, multiple operations
- Strongly typed schema
- Introspection built-in
### 3. API Versioning Strategies
**URL Versioning:**
```
/api/v1/users
/api/v2/users
```
**Header Versioning:**
```
Accept: application/vnd.api+json; version=1
```
**Query Parameter Versioning:**
```
/api/users?version=1
```
## REST API Design Patterns
### Pattern 1: Resource Collection Design
```python
# Good: Resource-oriented endpoints
GET /api/users # List users (with pagination)
POST /api/users # Create user
GET /api/users/{id} # Get specific user
PUT /api/users/{id} # Replace user
PATCH /api/users/{id} # Update user fields
DELETE /api/users/{id} # Delete user
# Nested resources
GET /api/users/{id}/orders # Get user's orders
POST /api/users/{id}/orders # Create order for user
# Bad: Action-oriented endpoints (avoid)
POST /api/createUser
POST /api/getUserById
POST /api/deleteUser
```
### Pattern 2: Pagination and Filtering
```python
from typing import List, Optional
from pydantic import BaseModel, Field
class PaginationParams(BaseModel):
page: int = Field(1, ge=1, description="Page number")
page_size: int = Field(20, ge=1, le=100, description="Items per page")
class FilterParams(BaseModel):
status: Optional[str] = None
created_after: Optional[str] = None
search: Optional[str] = None
class PaginatedResponse(BaseModel):
items: List[dict]
total: int
page: int
page_size: int
pages: int
@property
def has_next(self) -> bool:
return self.page < self.pages
@property
def has_prev(self) -> bool:
return self.page > 1
# FastAPI endpoint example
from fastapi import FastAPI, Query, Depends
app = FastAPI()
@app.get("/api/users", response_model=PaginatedResponse)
async def list_users(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
status: Optional[str] = Query(None),
search: Optional[str] = Query(None)
):
# Apply filters
query = build_query(status=status, search=search)
# Count total
total = await count_users(query)
# Fetch page
offset = (page - 1) * page_size
users = await fetch_users(query, limit=page_size, offset=offset)
return PaginatedResponse(
items=users,
total=total,
page=page,
page_size=page_size,
pages=(total + page_size - 1) // page_size
)
```
### Pattern 3: Error Handling and Status Codes
```python
from fastapi import HTTPException, status
from pydantic import BaseModel
class ErrorResponse(BaseModel):
error: str
message: str
details: Optional[dict] = None
timestamp: str
path: str
class ValidationErrorDetail(BaseModel):
field: str
message: str
value: Any
# Consistent error responses
STATUS_CODES = {
"success": 200,
"created": 201,
"no_content": 204,
"bad_request": 400,
"unauthorized": 401,
"forbidden": 403,
"not_found": 404,
"conflict": 409,
"unprocessable": 422,
"internal_error": 500
}
def raise_not_found(resource: str, id: str):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"error": "NotFound",
"message": f"{resource} not found",
"details": {"id": id}
}
)
def raise_validation_error(errors: List[ValidationErrorDetail]):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"error": "ValidationError",
"message": "Request validation failed",
"details": {"errors": [e.dict() for e in errors]}
}
)
# Example usage
@app.get("/api/users/{user_id}")
async def get_user(user_id: str):
user = await fetch_user(user_id)
if not user:
raise_not_found("User", user_id)
return user
```
### Pattern 4: HATEOAS (Hypermedia as the Engine of Application State)
```python
class UserResponse(BaseModel):
id: str
name: str
email: str
_links: dict
@classmethod
def from_user(cls, user: User, base_url: str):
return cls(
id=user.id,
name=user.name,
email=user.email,
_links={
"self": {"href": f"{base_url}/api/users/{user.id}"},
"orders": {"href": f"{base_url}/api/users/{user.id}/orders"},
"update": {
"href": f"{base_url}/api/users/{user.id}",
"method": "PATCH"
},
"delete": {
"href": f"{base_url}/api/users/{user.id}",
"method": "DELETE"
}
}
)
```
## GraphQL Design Patterns
### Pattern 1: Schema Design
```graphql
# schema.graphql
# Clear type definitions
type User {
id: ID!
email: String!
name: String!
createdAt: DateTime!
# Relationships
orders(
first: Int = 20
after: String
status: OrderStatus
): OrderConnection!
profile: UserProfile
}
type Order {
id: ID!
status: OrderStatus!
total: Money!
items: [OrderItem!]!
createdAt: DateTime!
# Back-reference
user: User!
}
# Pagination pattern (Relay-style)
type OrderConnection {
edges: [OrderEdge!]!
pageInfo: PageInfo!
totalCount: Int!
}
type OrderEdge {
node: Order!
cursor: String!
}
type PageInfo {
hasNextPage: Boolean!
hasPreviousPage: Boolean!
startCursor: String
endCursor: String
}
# Enums for type safety
enum OrderStatus {
PENDING
CONFIRMED
SHIPPED
DELIVERED
CANCELLED
}
# Custom scalars
scalar DateTime
scalar Money
# Query root
type Query {
user(id: ID!): User
users(
first: Int = 20
after: String
search: String
): UserConnection!
order(id: ID!): Order
}
# Mutation root
type Mutation {
createUser(input: CreateUserInput!): CreateUserPayload!
updateUser(input: UpdateUserInput!): UpdateUserPayload!
deleteUser(id: ID!): DeleteUserPayload!
createOrder(input: CreateOrderInput!): CreateOrderPayload!
}
# Input types for mutations
input CreateUserInput {
email: String!
name: String!
password: String!
}
# Payload types for mutations
type CreateUserPayload {
user: User
errors: [Error!]
}
type Error {
field: String
message: String!
}
```
### Pattern 2: Resolver Design
```python
from typing import Optional, List
from ariadne import QueryType, MutationType, ObjectType
from dataclasses import dataclass
query = QueryType()
mutation = MutationType()
user_type = ObjectType("User")
@query.field("user")
async def resolve_user(obj, info, id: str) -> Optional[dict]:
"""Resolve single user by ID."""
return await fetch_user_by_id(id)
@query.field("users")
async def resolve_users(
obj,
info,
first: int = 20,
after: Optional[str] = None,
search: Optional[str] = None
) -> dict:
"""Resolve paginated user list."""
# Decode cursor
offset = decode_cursor(after) if after else 0
# Fetch users
users = await fetch_users(
limit=first + 1, # Fetch one extra to check hasNextPage
offset=offset,
search=search
)
# Pagination
has_next = len(users) > first
if has_next:
users = users[:first]
edges = [
{
"node": user,
"cursor": encode_cursor(offset + i)
}
for i, user in enumerate(users)
]
return {
"edges": edges,
"pageInfo": {
"hasNextPage": has_next,
"hasPreviousPage": offset > 0,
"startCursor": edges[0]["cursor"] if edges else None,
"endCursor": edges[-1]["cursor"] if edges else None
},
"totalCount": await count_users(search=search)
}
@user_type.field("orders")
async def resolve_user_orders(user: dict, info, first: int = 20) -> dict:
"""Resolve user's orders (N+1 prevention with DataLoader)."""
# Use DataLoader to batch requests
loader = info.context["loaders"]["orders_by_user"]
orders = await loader.load(user["id"])
return paginate_orders(orders, first)
@mutation.field("createUser")
async def resolve_create_user(obj, info, input: dict) -> dict:
"""Create new user."""
try:
# Validate input
validate_user_input(input)
# Create user
user = await create_user(
email=input["email"],
name=input["name"],
password=hash_password(input["password"])
)
return {
"user": user,
"errors": []
}
except ValidationError as e:
return {
"user": None,
"errors": [{"field": e.field, "message": e.message}]
}
```
### Pattern 3: DataLoader (N+1 Problem Prevention)
```python
from aiodataloader import DataLoader
from typing import List, Optional
class UserLoader(DataLoader):
"""Batch load users by ID."""
async def batch_load_fn(self, user_ids: List[str]) -> List[Optional[dict]]:
"""Load multiple users in single query."""
users = await fetch_users_by_ids(user_ids)
# Map results back to input order
user_map = {user["id"]: user for user in users}
return [user_map.get(user_id) for user_id in user_ids]
class OrdersByUserLoader(DataLoader):
"""Batch load orders by user ID."""
async def batch_load_fn(self, user_ids: List[str]) -> List[List[dict]]:
"""Load orders for multiple users in single query."""
orders = await fetch_orders_by_user_ids(user_ids)
# Group orders by user_id
orders_by_user = {}
for order in orders:
user_id = order["user_id"]
if user_id not in orders_by_user:
orders_by_user[user_id] = []
orders_by_user[user_id].append(order)
# Return in input order
return [orders_by_user.get(user_id, []) for user_id in user_ids]
# Context setup
def create_context():
return {
"loaders": {
"user": UserLoader(),
"orders_by_user": OrdersByUserLoader()
}
}
```
## Best Practices
### REST APIs
1. **Consistent Naming**: Use plural nouns for collections (`/users`, not `/user`)
2. **Stateless**: Each request contains all necessary information
3. **Use HTTP Status Codes Correctly**: 2xx success, 4xx client errors, 5xx server errors
4. **Version Your API**: Plan for breaking changes from day one
5. **Pagination**: Always paginate large collections
6. **Rate Limiting**: Protect your API with rate limits
7. **Documentation**: Use OpenAPI/Swagger for interactive docs
### GraphQL APIs
1. **Schema First**: Design schema before writing resolvers
2. **Avoid N+1**: Use DataLoaders for efficient data fetching
3. **Input Validation**: Validate at schema and resolver levels
4. **Error Handling**: Return structured errors in mutation payloads
5. **Pagination**: Use cursor-based pagination (Relay spec)
6. **Deprecation**: Use `@deprecated` directive for gradual migration
7. **Monitoring**: Track query complexity and execution time
## Common Pitfalls
- **Over-fetching/Under-fetching (REST)**: Fixed in GraphQL but requires DataLoaders
- **Breaking Changes**: Version APIs or use deprecation strategies
- **Inconsistent Error Formats**: Standardize error responses
- **Missing Rate Limits**: APIs without limits are vulnerable to abuse
- **Poor Documentation**: Undocumented APIs frustrate developers
- **Ignoring HTTP Semantics**: POST for idempotent operations breaks expectations
- **Tight Coupling**: API structure shouldn't mirror database schema
## Resources
- **references/rest-best-practices.md**: Comprehensive REST API design guide
- **references/graphql-schema-design.md**: GraphQL schema patterns and anti-patterns
- **references/api-versioning-strategies.md**: Versioning approaches and migration paths
- **assets/rest-api-template.py**: FastAPI REST API template
- **assets/graphql-schema-template.graphql**: Complete GraphQL schema example
- **assets/api-design-checklist.md**: Pre-implementation review checklist
- **scripts/openapi-generator.py**: Generate OpenAPI specs from code

View File

@@ -0,0 +1,136 @@
# API Design Checklist
## Pre-Implementation Review
### Resource Design
- [ ] Resources are nouns, not verbs
- [ ] Plural names for collections
- [ ] Consistent naming across all endpoints
- [ ] Clear resource hierarchy (avoid deep nesting >2 levels)
- [ ] All CRUD operations properly mapped to HTTP methods
### HTTP Methods
- [ ] GET for retrieval (safe, idempotent)
- [ ] POST for creation
- [ ] PUT for full replacement (idempotent)
- [ ] PATCH for partial updates
- [ ] DELETE for removal (idempotent)
### Status Codes
- [ ] 200 OK for successful GET/PATCH/PUT
- [ ] 201 Created for POST
- [ ] 204 No Content for DELETE
- [ ] 400 Bad Request for malformed requests
- [ ] 401 Unauthorized for missing auth
- [ ] 403 Forbidden for insufficient permissions
- [ ] 404 Not Found for missing resources
- [ ] 422 Unprocessable Entity for validation errors
- [ ] 429 Too Many Requests for rate limiting
- [ ] 500 Internal Server Error for server issues
### Pagination
- [ ] All collection endpoints paginated
- [ ] Default page size defined (e.g., 20)
- [ ] Maximum page size enforced (e.g., 100)
- [ ] Pagination metadata included (total, pages, etc.)
- [ ] Cursor-based or offset-based pattern chosen
### Filtering & Sorting
- [ ] Query parameters for filtering
- [ ] Sort parameter supported
- [ ] Search parameter for full-text search
- [ ] Field selection supported (sparse fieldsets)
### Versioning
- [ ] Versioning strategy defined (URL/header/query)
- [ ] Version included in all endpoints
- [ ] Deprecation policy documented
### Error Handling
- [ ] Consistent error response format
- [ ] Detailed error messages
- [ ] Field-level validation errors
- [ ] Error codes for client handling
- [ ] Timestamps in error responses
### Authentication & Authorization
- [ ] Authentication method defined (Bearer token, API key)
- [ ] Authorization checks on all endpoints
- [ ] 401 vs 403 used correctly
- [ ] Token expiration handled
### Rate Limiting
- [ ] Rate limits defined per endpoint/user
- [ ] Rate limit headers included
- [ ] 429 status code for exceeded limits
- [ ] Retry-After header provided
### Documentation
- [ ] OpenAPI/Swagger spec generated
- [ ] All endpoints documented
- [ ] Request/response examples provided
- [ ] Error responses documented
- [ ] Authentication flow documented
### Testing
- [ ] Unit tests for business logic
- [ ] Integration tests for endpoints
- [ ] Error scenarios tested
- [ ] Edge cases covered
- [ ] Performance tests for heavy endpoints
### Security
- [ ] Input validation on all fields
- [ ] SQL injection prevention
- [ ] XSS prevention
- [ ] CORS configured correctly
- [ ] HTTPS enforced
- [ ] Sensitive data not in URLs
- [ ] No secrets in responses
### Performance
- [ ] Database queries optimized
- [ ] N+1 queries prevented
- [ ] Caching strategy defined
- [ ] Cache headers set appropriately
- [ ] Large responses paginated
### Monitoring
- [ ] Logging implemented
- [ ] Error tracking configured
- [ ] Performance metrics collected
- [ ] Health check endpoint available
- [ ] Alerts configured for errors
## GraphQL-Specific Checks
### Schema Design
- [ ] Schema-first approach used
- [ ] Types properly defined
- [ ] Non-null vs nullable decided
- [ ] Interfaces/unions used appropriately
- [ ] Custom scalars defined
### Queries
- [ ] Query depth limiting
- [ ] Query complexity analysis
- [ ] DataLoaders prevent N+1
- [ ] Pagination pattern chosen (Relay/offset)
### Mutations
- [ ] Input types defined
- [ ] Payload types with errors
- [ ] Optimistic response support
- [ ] Idempotency considered
### Performance
- [ ] DataLoader for all relationships
- [ ] Query batching enabled
- [ ] Persisted queries considered
- [ ] Response caching implemented
### Documentation
- [ ] All fields documented
- [ ] Deprecations marked
- [ ] Examples provided
- [ ] Schema introspection enabled

View File

@@ -0,0 +1,165 @@
"""
Production-ready REST API template using FastAPI.
Includes pagination, filtering, error handling, and best practices.
"""
from fastapi import FastAPI, HTTPException, Query, Path, Depends, status
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, EmailStr
from typing import Optional, List, Any
from datetime import datetime
from enum import Enum
app = FastAPI(
title="API Template",
version="1.0.0",
docs_url="/api/docs"
)
# Models
class UserStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
class UserBase(BaseModel):
email: EmailStr
name: str = Field(..., min_length=1, max_length=100)
status: UserStatus = UserStatus.ACTIVE
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
name: Optional[str] = Field(None, min_length=1, max_length=100)
status: Optional[UserStatus] = None
class User(UserBase):
id: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# Pagination
class PaginationParams(BaseModel):
page: int = Field(1, ge=1)
page_size: int = Field(20, ge=1, le=100)
class PaginatedResponse(BaseModel):
items: List[Any]
total: int
page: int
page_size: int
pages: int
# Error handling
class ErrorDetail(BaseModel):
field: Optional[str] = None
message: str
code: str
class ErrorResponse(BaseModel):
error: str
message: str
details: Optional[List[ErrorDetail]] = None
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content=ErrorResponse(
error=exc.__class__.__name__,
message=exc.detail if isinstance(exc.detail, str) else exc.detail.get("message", "Error"),
details=exc.detail.get("details") if isinstance(exc.detail, dict) else None
).dict()
)
# Endpoints
@app.get("/api/users", response_model=PaginatedResponse, tags=["Users"])
async def list_users(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
status: Optional[UserStatus] = Query(None),
search: Optional[str] = Query(None)
):
"""List users with pagination and filtering."""
# Mock implementation
total = 100
items = [
User(
id=str(i),
email=f"user{i}@example.com",
name=f"User {i}",
status=UserStatus.ACTIVE,
created_at=datetime.now(),
updated_at=datetime.now()
).dict()
for i in range((page-1)*page_size, min(page*page_size, total))
]
return PaginatedResponse(
items=items,
total=total,
page=page,
page_size=page_size,
pages=(total + page_size - 1) // page_size
)
@app.post("/api/users", response_model=User, status_code=status.HTTP_201_CREATED, tags=["Users"])
async def create_user(user: UserCreate):
"""Create a new user."""
# Mock implementation
return User(
id="123",
email=user.email,
name=user.name,
status=user.status,
created_at=datetime.now(),
updated_at=datetime.now()
)
@app.get("/api/users/{user_id}", response_model=User, tags=["Users"])
async def get_user(user_id: str = Path(..., description="User ID")):
"""Get user by ID."""
# Mock: Check if exists
if user_id == "999":
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"message": "User not found", "details": {"id": user_id}}
)
return User(
id=user_id,
email="user@example.com",
name="User Name",
status=UserStatus.ACTIVE,
created_at=datetime.now(),
updated_at=datetime.now()
)
@app.patch("/api/users/{user_id}", response_model=User, tags=["Users"])
async def update_user(user_id: str, update: UserUpdate):
"""Partially update user."""
# Validate user exists
existing = await get_user(user_id)
# Apply updates
update_data = update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(existing, field, value)
existing.updated_at = datetime.now()
return existing
@app.delete("/api/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["Users"])
async def delete_user(user_id: str):
"""Delete user."""
await get_user(user_id) # Verify exists
return None
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -0,0 +1,566 @@
# GraphQL Schema Design Patterns
## Schema Organization
### Modular Schema Structure
```graphql
# user.graphql
type User {
id: ID!
email: String!
name: String!
posts: [Post!]!
}
extend type Query {
user(id: ID!): User
users(first: Int, after: String): UserConnection!
}
extend type Mutation {
createUser(input: CreateUserInput!): CreateUserPayload!
}
# post.graphql
type Post {
id: ID!
title: String!
content: String!
author: User!
}
extend type Query {
post(id: ID!): Post
}
```
## Type Design Patterns
### 1. Non-Null Types
```graphql
type User {
id: ID! # Always required
email: String! # Required
phone: String # Optional (nullable)
posts: [Post!]! # Non-null array of non-null posts
tags: [String!] # Nullable array of non-null strings
}
```
### 2. Interfaces for Polymorphism
```graphql
interface Node {
id: ID!
createdAt: DateTime!
}
type User implements Node {
id: ID!
createdAt: DateTime!
email: String!
}
type Post implements Node {
id: ID!
createdAt: DateTime!
title: String!
}
type Query {
node(id: ID!): Node
}
```
### 3. Unions for Heterogeneous Results
```graphql
union SearchResult = User | Post | Comment
type Query {
search(query: String!): [SearchResult!]!
}
# Query example
{
search(query: "graphql") {
... on User {
name
email
}
... on Post {
title
content
}
... on Comment {
text
author { name }
}
}
}
```
### 4. Input Types
```graphql
input CreateUserInput {
email: String!
name: String!
password: String!
profileInput: ProfileInput
}
input ProfileInput {
bio: String
avatar: String
website: String
}
input UpdateUserInput {
id: ID!
email: String
name: String
profileInput: ProfileInput
}
```
## Pagination Patterns
### Relay Cursor Pagination (Recommended)
```graphql
type UserConnection {
edges: [UserEdge!]!
pageInfo: PageInfo!
totalCount: Int!
}
type UserEdge {
node: User!
cursor: String!
}
type PageInfo {
hasNextPage: Boolean!
hasPreviousPage: Boolean!
startCursor: String
endCursor: String
}
type Query {
users(
first: Int
after: String
last: Int
before: String
): UserConnection!
}
# Usage
{
users(first: 10, after: "cursor123") {
edges {
cursor
node {
id
name
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
```
### Offset Pagination (Simpler)
```graphql
type UserList {
items: [User!]!
total: Int!
page: Int!
pageSize: Int!
}
type Query {
users(page: Int = 1, pageSize: Int = 20): UserList!
}
```
## Mutation Design Patterns
### 1. Input/Payload Pattern
```graphql
input CreatePostInput {
title: String!
content: String!
tags: [String!]
}
type CreatePostPayload {
post: Post
errors: [Error!]
success: Boolean!
}
type Error {
field: String
message: String!
code: String!
}
type Mutation {
createPost(input: CreatePostInput!): CreatePostPayload!
}
```
### 2. Optimistic Response Support
```graphql
type UpdateUserPayload {
user: User
clientMutationId: String
errors: [Error!]
}
input UpdateUserInput {
id: ID!
name: String
clientMutationId: String
}
type Mutation {
updateUser(input: UpdateUserInput!): UpdateUserPayload!
}
```
### 3. Batch Mutations
```graphql
input BatchCreateUserInput {
users: [CreateUserInput!]!
}
type BatchCreateUserPayload {
results: [CreateUserResult!]!
successCount: Int!
errorCount: Int!
}
type CreateUserResult {
user: User
errors: [Error!]
index: Int!
}
type Mutation {
batchCreateUsers(input: BatchCreateUserInput!): BatchCreateUserPayload!
}
```
## Field Design
### Arguments and Filtering
```graphql
type Query {
posts(
# Pagination
first: Int = 20
after: String
# Filtering
status: PostStatus
authorId: ID
tag: String
# Sorting
orderBy: PostOrderBy = CREATED_AT
orderDirection: OrderDirection = DESC
# Searching
search: String
): PostConnection!
}
enum PostStatus {
DRAFT
PUBLISHED
ARCHIVED
}
enum PostOrderBy {
CREATED_AT
UPDATED_AT
TITLE
}
enum OrderDirection {
ASC
DESC
}
```
### Computed Fields
```graphql
type User {
firstName: String!
lastName: String!
fullName: String! # Computed in resolver
posts: [Post!]!
postCount: Int! # Computed, doesn't load all posts
}
type Post {
likeCount: Int!
commentCount: Int!
isLikedByViewer: Boolean! # Context-dependent
}
```
## Subscriptions
```graphql
type Subscription {
postAdded: Post!
postUpdated(postId: ID!): Post!
userStatusChanged(userId: ID!): UserStatus!
}
type UserStatus {
userId: ID!
online: Boolean!
lastSeen: DateTime!
}
# Client usage
subscription {
postAdded {
id
title
author {
name
}
}
}
```
## Custom Scalars
```graphql
scalar DateTime
scalar Email
scalar URL
scalar JSON
scalar Money
type User {
email: Email!
website: URL
createdAt: DateTime!
metadata: JSON
}
type Product {
price: Money!
}
```
## Directives
### Built-in Directives
```graphql
type User {
name: String!
email: String! @deprecated(reason: "Use emails field instead")
emails: [String!]!
# Conditional inclusion
privateData: PrivateData @include(if: $isOwner)
}
# Query
query GetUser($isOwner: Boolean!) {
user(id: "123") {
name
privateData @include(if: $isOwner) {
ssn
}
}
}
```
### Custom Directives
```graphql
directive @auth(requires: Role = USER) on FIELD_DEFINITION
enum Role {
USER
ADMIN
MODERATOR
}
type Mutation {
deleteUser(id: ID!): Boolean! @auth(requires: ADMIN)
updateProfile(input: ProfileInput!): User! @auth
}
```
## Error Handling
### Union Error Pattern
```graphql
type User {
id: ID!
email: String!
}
type ValidationError {
field: String!
message: String!
}
type NotFoundError {
message: String!
resourceType: String!
resourceId: ID!
}
type AuthorizationError {
message: String!
}
union UserResult = User | ValidationError | NotFoundError | AuthorizationError
type Query {
user(id: ID!): UserResult!
}
# Usage
{
user(id: "123") {
... on User {
id
email
}
... on NotFoundError {
message
resourceType
}
... on AuthorizationError {
message
}
}
}
```
### Errors in Payload
```graphql
type CreateUserPayload {
user: User
errors: [Error!]
success: Boolean!
}
type Error {
field: String
message: String!
code: ErrorCode!
}
enum ErrorCode {
VALIDATION_ERROR
UNAUTHORIZED
NOT_FOUND
INTERNAL_ERROR
}
```
## N+1 Query Problem Solutions
### DataLoader Pattern
```python
from aiodataloader import DataLoader
class PostLoader(DataLoader):
async def batch_load_fn(self, post_ids):
posts = await db.posts.find({"id": {"$in": post_ids}})
post_map = {post["id"]: post for post in posts}
return [post_map.get(pid) for pid in post_ids]
# Resolver
@user_type.field("posts")
async def resolve_posts(user, info):
loader = info.context["loaders"]["post"]
return await loader.load_many(user["post_ids"])
```
### Query Depth Limiting
```python
from graphql import GraphQLError
def depth_limit_validator(max_depth: int):
def validate(context, node, ancestors):
depth = len(ancestors)
if depth > max_depth:
raise GraphQLError(
f"Query depth {depth} exceeds maximum {max_depth}"
)
return validate
```
### Query Complexity Analysis
```python
def complexity_limit_validator(max_complexity: int):
def calculate_complexity(node):
# Each field = 1, lists multiply
complexity = 1
if is_list_field(node):
complexity *= get_list_size_arg(node)
return complexity
return validate_complexity
```
## Schema Versioning
### Field Deprecation
```graphql
type User {
name: String! @deprecated(reason: "Use firstName and lastName")
firstName: String!
lastName: String!
}
```
### Schema Evolution
```graphql
# v1 - Initial
type User {
name: String!
}
# v2 - Add optional field (backward compatible)
type User {
name: String!
email: String
}
# v3 - Deprecate and add new field
type User {
name: String! @deprecated(reason: "Use firstName/lastName")
firstName: String!
lastName: String!
email: String
}
```
## Best Practices Summary
1. **Nullable vs Non-Null**: Start nullable, make non-null when guaranteed
2. **Input Types**: Always use input types for mutations
3. **Payload Pattern**: Return errors in mutation payloads
4. **Pagination**: Use cursor-based for infinite scroll, offset for simple cases
5. **Naming**: Use camelCase for fields, PascalCase for types
6. **Deprecation**: Use `@deprecated` instead of removing fields
7. **DataLoaders**: Always use for relationships to prevent N+1
8. **Complexity Limits**: Protect against expensive queries
9. **Custom Scalars**: Use for domain-specific types (Email, DateTime)
10. **Documentation**: Document all fields with descriptions

View File

@@ -0,0 +1,385 @@
# REST API Best Practices
## URL Structure
### Resource Naming
```
# Good - Plural nouns
GET /api/users
GET /api/orders
GET /api/products
# Bad - Verbs or mixed conventions
GET /api/getUser
GET /api/user (inconsistent singular)
POST /api/createOrder
```
### Nested Resources
```
# Shallow nesting (preferred)
GET /api/users/{id}/orders
GET /api/orders/{id}
# Deep nesting (avoid)
GET /api/users/{id}/orders/{orderId}/items/{itemId}/reviews
# Better:
GET /api/order-items/{id}/reviews
```
## HTTP Methods and Status Codes
### GET - Retrieve Resources
```
GET /api/users → 200 OK (with list)
GET /api/users/{id} → 200 OK or 404 Not Found
GET /api/users?page=2 → 200 OK (paginated)
```
### POST - Create Resources
```
POST /api/users
Body: {"name": "John", "email": "john@example.com"}
→ 201 Created
Location: /api/users/123
Body: {"id": "123", "name": "John", ...}
POST /api/users (validation error)
→ 422 Unprocessable Entity
Body: {"errors": [...]}
```
### PUT - Replace Resources
```
PUT /api/users/{id}
Body: {complete user object}
→ 200 OK (updated)
→ 404 Not Found (doesn't exist)
# Must include ALL fields
```
### PATCH - Partial Update
```
PATCH /api/users/{id}
Body: {"name": "Jane"} (only changed fields)
→ 200 OK
→ 404 Not Found
```
### DELETE - Remove Resources
```
DELETE /api/users/{id}
→ 204 No Content (deleted)
→ 404 Not Found
→ 409 Conflict (can't delete due to references)
```
## Filtering, Sorting, and Searching
### Query Parameters
```
# Filtering
GET /api/users?status=active
GET /api/users?role=admin&status=active
# Sorting
GET /api/users?sort=created_at
GET /api/users?sort=-created_at (descending)
GET /api/users?sort=name,created_at
# Searching
GET /api/users?search=john
GET /api/users?q=john
# Field selection (sparse fieldsets)
GET /api/users?fields=id,name,email
```
## Pagination Patterns
### Offset-Based Pagination
```python
GET /api/users?page=2&page_size=20
Response:
{
"items": [...],
"page": 2,
"page_size": 20,
"total": 150,
"pages": 8
}
```
### Cursor-Based Pagination (for large datasets)
```python
GET /api/users?limit=20&cursor=eyJpZCI6MTIzfQ
Response:
{
"items": [...],
"next_cursor": "eyJpZCI6MTQzfQ",
"has_more": true
}
```
### Link Header Pagination (RESTful)
```
GET /api/users?page=2
Response Headers:
Link: <https://api.example.com/users?page=3>; rel="next",
<https://api.example.com/users?page=1>; rel="prev",
<https://api.example.com/users?page=1>; rel="first",
<https://api.example.com/users?page=8>; rel="last"
```
## Versioning Strategies
### URL Versioning (Recommended)
```
/api/v1/users
/api/v2/users
Pros: Clear, easy to route
Cons: Multiple URLs for same resource
```
### Header Versioning
```
GET /api/users
Accept: application/vnd.api+json; version=2
Pros: Clean URLs
Cons: Less visible, harder to test
```
### Query Parameter
```
GET /api/users?version=2
Pros: Easy to test
Cons: Optional parameter can be forgotten
```
## Rate Limiting
### Headers
```
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 742
X-RateLimit-Reset: 1640000000
Response when limited:
429 Too Many Requests
Retry-After: 3600
```
### Implementation Pattern
```python
from fastapi import HTTPException, Request
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, calls: int, period: int):
self.calls = calls
self.period = period
self.cache = {}
def check(self, key: str) -> bool:
now = datetime.now()
if key not in self.cache:
self.cache[key] = []
# Remove old requests
self.cache[key] = [
ts for ts in self.cache[key]
if now - ts < timedelta(seconds=self.period)
]
if len(self.cache[key]) >= self.calls:
return False
self.cache[key].append(now)
return True
limiter = RateLimiter(calls=100, period=60)
@app.get("/api/users")
async def get_users(request: Request):
if not limiter.check(request.client.host):
raise HTTPException(
status_code=429,
headers={"Retry-After": "60"}
)
return {"users": [...]}
```
## Authentication and Authorization
### Bearer Token
```
Authorization: Bearer eyJhbGciOiJIUzI1NiIs...
401 Unauthorized - Missing/invalid token
403 Forbidden - Valid token, insufficient permissions
```
### API Keys
```
X-API-Key: your-api-key-here
```
## Error Response Format
### Consistent Structure
```json
{
"error": {
"code": "VALIDATION_ERROR",
"message": "Request validation failed",
"details": [
{
"field": "email",
"message": "Invalid email format",
"value": "not-an-email"
}
],
"timestamp": "2025-10-16T12:00:00Z",
"path": "/api/users"
}
}
```
### Status Code Guidelines
- `200 OK`: Successful GET, PATCH, PUT
- `201 Created`: Successful POST
- `204 No Content`: Successful DELETE
- `400 Bad Request`: Malformed request
- `401 Unauthorized`: Authentication required
- `403 Forbidden`: Authenticated but not authorized
- `404 Not Found`: Resource doesn't exist
- `409 Conflict`: State conflict (duplicate email, etc.)
- `422 Unprocessable Entity`: Validation errors
- `429 Too Many Requests`: Rate limited
- `500 Internal Server Error`: Server error
- `503 Service Unavailable`: Temporary downtime
## Caching
### Cache Headers
```
# Client caching
Cache-Control: public, max-age=3600
# No caching
Cache-Control: no-cache, no-store, must-revalidate
# Conditional requests
ETag: "33a64df551425fcc55e4d42a148795d9f25f89d4"
If-None-Match: "33a64df551425fcc55e4d42a148795d9f25f89d4"
→ 304 Not Modified
```
## Bulk Operations
### Batch Endpoints
```python
POST /api/users/batch
{
"items": [
{"name": "User1", "email": "user1@example.com"},
{"name": "User2", "email": "user2@example.com"}
]
}
Response:
{
"results": [
{"id": "1", "status": "created"},
{"id": null, "status": "failed", "error": "Email already exists"}
]
}
```
## Idempotency
### Idempotency Keys
```
POST /api/orders
Idempotency-Key: unique-key-123
If duplicate request:
→ 200 OK (return cached response)
```
## CORS Configuration
```python
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://example.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
```
## Documentation with OpenAPI
```python
from fastapi import FastAPI
app = FastAPI(
title="My API",
description="API for managing users",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
@app.get(
"/api/users/{user_id}",
summary="Get user by ID",
response_description="User details",
tags=["Users"]
)
async def get_user(
user_id: str = Path(..., description="The user ID")
):
"""
Retrieve user by ID.
Returns full user profile including:
- Basic information
- Contact details
- Account status
"""
pass
```
## Health and Monitoring Endpoints
```python
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"version": "1.0.0",
"timestamp": datetime.now().isoformat()
}
@app.get("/health/detailed")
async def detailed_health():
return {
"status": "healthy",
"checks": {
"database": await check_database(),
"redis": await check_redis(),
"external_api": await check_external_api()
}
}
```

View File

@@ -0,0 +1,487 @@
---
name: architecture-patterns
description: Implement proven backend architecture patterns including Clean Architecture, Hexagonal Architecture, and Domain-Driven Design. Use when architecting complex backend systems or refactoring existing applications for better maintainability.
---
# Architecture Patterns
Master proven backend architecture patterns including Clean Architecture, Hexagonal Architecture, and Domain-Driven Design to build maintainable, testable, and scalable systems.
## When to Use This Skill
- Designing new backend systems from scratch
- Refactoring monolithic applications for better maintainability
- Establishing architecture standards for your team
- Migrating from tightly coupled to loosely coupled architectures
- Implementing domain-driven design principles
- Creating testable and mockable codebases
- Planning microservices decomposition
## Core Concepts
### 1. Clean Architecture (Uncle Bob)
**Layers (dependency flows inward):**
- **Entities**: Core business models
- **Use Cases**: Application business rules
- **Interface Adapters**: Controllers, presenters, gateways
- **Frameworks & Drivers**: UI, database, external services
**Key Principles:**
- Dependencies point inward
- Inner layers know nothing about outer layers
- Business logic independent of frameworks
- Testable without UI, database, or external services
### 2. Hexagonal Architecture (Ports and Adapters)
**Components:**
- **Domain Core**: Business logic
- **Ports**: Interfaces defining interactions
- **Adapters**: Implementations of ports (database, REST, message queue)
**Benefits:**
- Swap implementations easily (mock for testing)
- Technology-agnostic core
- Clear separation of concerns
### 3. Domain-Driven Design (DDD)
**Strategic Patterns:**
- **Bounded Contexts**: Separate models for different domains
- **Context Mapping**: How contexts relate
- **Ubiquitous Language**: Shared terminology
**Tactical Patterns:**
- **Entities**: Objects with identity
- **Value Objects**: Immutable objects defined by attributes
- **Aggregates**: Consistency boundaries
- **Repositories**: Data access abstraction
- **Domain Events**: Things that happened
## Clean Architecture Pattern
### Directory Structure
```
app/
├── domain/ # Entities & business rules
│ ├── entities/
│ │ ├── user.py
│ │ └── order.py
│ ├── value_objects/
│ │ ├── email.py
│ │ └── money.py
│ └── interfaces/ # Abstract interfaces
│ ├── user_repository.py
│ └── payment_gateway.py
├── use_cases/ # Application business rules
│ ├── create_user.py
│ ├── process_order.py
│ └── send_notification.py
├── adapters/ # Interface implementations
│ ├── repositories/
│ │ ├── postgres_user_repository.py
│ │ └── redis_cache_repository.py
│ ├── controllers/
│ │ └── user_controller.py
│ └── gateways/
│ ├── stripe_payment_gateway.py
│ └── sendgrid_email_gateway.py
└── infrastructure/ # Framework & external concerns
├── database.py
├── config.py
└── logging.py
```
### Implementation Example
```python
# domain/entities/user.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class User:
"""Core user entity - no framework dependencies."""
id: str
email: str
name: str
created_at: datetime
is_active: bool = True
def deactivate(self):
"""Business rule: deactivating user."""
self.is_active = False
def can_place_order(self) -> bool:
"""Business rule: active users can order."""
return self.is_active
# domain/interfaces/user_repository.py
from abc import ABC, abstractmethod
from typing import Optional, List
from domain.entities.user import User
class IUserRepository(ABC):
"""Port: defines contract, no implementation."""
@abstractmethod
async def find_by_id(self, user_id: str) -> Optional[User]:
pass
@abstractmethod
async def find_by_email(self, email: str) -> Optional[User]:
pass
@abstractmethod
async def save(self, user: User) -> User:
pass
@abstractmethod
async def delete(self, user_id: str) -> bool:
pass
# use_cases/create_user.py
from domain.entities.user import User
from domain.interfaces.user_repository import IUserRepository
from dataclasses import dataclass
from datetime import datetime
import uuid
@dataclass
class CreateUserRequest:
email: str
name: str
@dataclass
class CreateUserResponse:
user: User
success: bool
error: Optional[str] = None
class CreateUserUseCase:
"""Use case: orchestrates business logic."""
def __init__(self, user_repository: IUserRepository):
self.user_repository = user_repository
async def execute(self, request: CreateUserRequest) -> CreateUserResponse:
# Business validation
existing = await self.user_repository.find_by_email(request.email)
if existing:
return CreateUserResponse(
user=None,
success=False,
error="Email already exists"
)
# Create entity
user = User(
id=str(uuid.uuid4()),
email=request.email,
name=request.name,
created_at=datetime.now(),
is_active=True
)
# Persist
saved_user = await self.user_repository.save(user)
return CreateUserResponse(
user=saved_user,
success=True
)
# adapters/repositories/postgres_user_repository.py
from domain.interfaces.user_repository import IUserRepository
from domain.entities.user import User
from typing import Optional
import asyncpg
class PostgresUserRepository(IUserRepository):
"""Adapter: PostgreSQL implementation."""
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def find_by_id(self, user_id: str) -> Optional[User]:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return self._to_entity(row) if row else None
async def find_by_email(self, email: str) -> Optional[User]:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM users WHERE email = $1", email
)
return self._to_entity(row) if row else None
async def save(self, user: User) -> User:
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO users (id, email, name, created_at, is_active)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (id) DO UPDATE
SET email = $2, name = $3, is_active = $5
""",
user.id, user.email, user.name, user.created_at, user.is_active
)
return user
async def delete(self, user_id: str) -> bool:
async with self.pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM users WHERE id = $1", user_id
)
return result == "DELETE 1"
def _to_entity(self, row) -> User:
"""Map database row to entity."""
return User(
id=row["id"],
email=row["email"],
name=row["name"],
created_at=row["created_at"],
is_active=row["is_active"]
)
# adapters/controllers/user_controller.py
from fastapi import APIRouter, Depends, HTTPException
from use_cases.create_user import CreateUserUseCase, CreateUserRequest
from pydantic import BaseModel
router = APIRouter()
class CreateUserDTO(BaseModel):
email: str
name: str
@router.post("/users")
async def create_user(
dto: CreateUserDTO,
use_case: CreateUserUseCase = Depends(get_create_user_use_case)
):
"""Controller: handles HTTP concerns only."""
request = CreateUserRequest(email=dto.email, name=dto.name)
response = await use_case.execute(request)
if not response.success:
raise HTTPException(status_code=400, detail=response.error)
return {"user": response.user}
```
## Hexagonal Architecture Pattern
```python
# Core domain (hexagon center)
class OrderService:
"""Domain service - no infrastructure dependencies."""
def __init__(
self,
order_repository: OrderRepositoryPort,
payment_gateway: PaymentGatewayPort,
notification_service: NotificationPort
):
self.orders = order_repository
self.payments = payment_gateway
self.notifications = notification_service
async def place_order(self, order: Order) -> OrderResult:
# Business logic
if not order.is_valid():
return OrderResult(success=False, error="Invalid order")
# Use ports (interfaces)
payment = await self.payments.charge(
amount=order.total,
customer=order.customer_id
)
if not payment.success:
return OrderResult(success=False, error="Payment failed")
order.mark_as_paid()
saved_order = await self.orders.save(order)
await self.notifications.send(
to=order.customer_email,
subject="Order confirmed",
body=f"Order {order.id} confirmed"
)
return OrderResult(success=True, order=saved_order)
# Ports (interfaces)
class OrderRepositoryPort(ABC):
@abstractmethod
async def save(self, order: Order) -> Order:
pass
class PaymentGatewayPort(ABC):
@abstractmethod
async def charge(self, amount: Money, customer: str) -> PaymentResult:
pass
class NotificationPort(ABC):
@abstractmethod
async def send(self, to: str, subject: str, body: str):
pass
# Adapters (implementations)
class StripePaymentAdapter(PaymentGatewayPort):
"""Primary adapter: connects to Stripe API."""
def __init__(self, api_key: str):
self.stripe = stripe
self.stripe.api_key = api_key
async def charge(self, amount: Money, customer: str) -> PaymentResult:
try:
charge = self.stripe.Charge.create(
amount=amount.cents,
currency=amount.currency,
customer=customer
)
return PaymentResult(success=True, transaction_id=charge.id)
except stripe.error.CardError as e:
return PaymentResult(success=False, error=str(e))
class MockPaymentAdapter(PaymentGatewayPort):
"""Test adapter: no external dependencies."""
async def charge(self, amount: Money, customer: str) -> PaymentResult:
return PaymentResult(success=True, transaction_id="mock-123")
```
## Domain-Driven Design Pattern
```python
# Value Objects (immutable)
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class Email:
"""Value object: validated email."""
value: str
def __post_init__(self):
if "@" not in self.value:
raise ValueError("Invalid email")
@dataclass(frozen=True)
class Money:
"""Value object: amount with currency."""
amount: int # cents
currency: str
def add(self, other: "Money") -> "Money":
if self.currency != other.currency:
raise ValueError("Currency mismatch")
return Money(self.amount + other.amount, self.currency)
# Entities (with identity)
class Order:
"""Entity: has identity, mutable state."""
def __init__(self, id: str, customer: Customer):
self.id = id
self.customer = customer
self.items: List[OrderItem] = []
self.status = OrderStatus.PENDING
self._events: List[DomainEvent] = []
def add_item(self, product: Product, quantity: int):
"""Business logic in entity."""
item = OrderItem(product, quantity)
self.items.append(item)
self._events.append(ItemAddedEvent(self.id, item))
def total(self) -> Money:
"""Calculated property."""
return sum(item.subtotal() for item in self.items)
def submit(self):
"""State transition with business rules."""
if not self.items:
raise ValueError("Cannot submit empty order")
if self.status != OrderStatus.PENDING:
raise ValueError("Order already submitted")
self.status = OrderStatus.SUBMITTED
self._events.append(OrderSubmittedEvent(self.id))
# Aggregates (consistency boundary)
class Customer:
"""Aggregate root: controls access to entities."""
def __init__(self, id: str, email: Email):
self.id = id
self.email = email
self._addresses: List[Address] = []
self._orders: List[str] = [] # Order IDs, not full objects
def add_address(self, address: Address):
"""Aggregate enforces invariants."""
if len(self._addresses) >= 5:
raise ValueError("Maximum 5 addresses allowed")
self._addresses.append(address)
@property
def primary_address(self) -> Optional[Address]:
return next((a for a in self._addresses if a.is_primary), None)
# Domain Events
@dataclass
class OrderSubmittedEvent:
order_id: str
occurred_at: datetime = field(default_factory=datetime.now)
# Repository (aggregate persistence)
class OrderRepository:
"""Repository: persist/retrieve aggregates."""
async def find_by_id(self, order_id: str) -> Optional[Order]:
"""Reconstitute aggregate from storage."""
pass
async def save(self, order: Order):
"""Persist aggregate and publish events."""
await self._persist(order)
await self._publish_events(order._events)
order._events.clear()
```
## Resources
- **references/clean-architecture-guide.md**: Detailed layer breakdown
- **references/hexagonal-architecture-guide.md**: Ports and adapters patterns
- **references/ddd-tactical-patterns.md**: Entities, value objects, aggregates
- **assets/clean-architecture-template/**: Complete project structure
- **assets/ddd-examples/**: Domain modeling examples
## Best Practices
1. **Dependency Rule**: Dependencies always point inward
2. **Interface Segregation**: Small, focused interfaces
3. **Business Logic in Domain**: Keep frameworks out of core
4. **Test Independence**: Core testable without infrastructure
5. **Bounded Contexts**: Clear domain boundaries
6. **Ubiquitous Language**: Consistent terminology
7. **Thin Controllers**: Delegate to use cases
8. **Rich Domain Models**: Behavior with data
## Common Pitfalls
- **Anemic Domain**: Entities with only data, no behavior
- **Framework Coupling**: Business logic depends on frameworks
- **Fat Controllers**: Business logic in controllers
- **Repository Leakage**: Exposing ORM objects
- **Missing Abstractions**: Concrete dependencies in core
- **Over-Engineering**: Clean architecture for simple CRUD

View File

@@ -0,0 +1,585 @@
---
name: microservices-patterns
description: Design microservices architectures with service boundaries, event-driven communication, and resilience patterns. Use when building distributed systems, decomposing monoliths, or implementing microservices.
---
# Microservices Patterns
Master microservices architecture patterns including service boundaries, inter-service communication, data management, and resilience patterns for building distributed systems.
## When to Use This Skill
- Decomposing monoliths into microservices
- Designing service boundaries and contracts
- Implementing inter-service communication
- Managing distributed data and transactions
- Building resilient distributed systems
- Implementing service discovery and load balancing
- Designing event-driven architectures
## Core Concepts
### 1. Service Decomposition Strategies
**By Business Capability**
- Organize services around business functions
- Each service owns its domain
- Example: OrderService, PaymentService, InventoryService
**By Subdomain (DDD)**
- Core domain, supporting subdomains
- Bounded contexts map to services
- Clear ownership and responsibility
**Strangler Fig Pattern**
- Gradually extract from monolith
- New functionality as microservices
- Proxy routes to old/new systems
### 2. Communication Patterns
**Synchronous (Request/Response)**
- REST APIs
- gRPC
- GraphQL
**Asynchronous (Events/Messages)**
- Event streaming (Kafka)
- Message queues (RabbitMQ, SQS)
- Pub/Sub patterns
### 3. Data Management
**Database Per Service**
- Each service owns its data
- No shared databases
- Loose coupling
**Saga Pattern**
- Distributed transactions
- Compensating actions
- Eventual consistency
### 4. Resilience Patterns
**Circuit Breaker**
- Fail fast on repeated errors
- Prevent cascade failures
**Retry with Backoff**
- Transient fault handling
- Exponential backoff
**Bulkhead**
- Isolate resources
- Limit impact of failures
## Service Decomposition Patterns
### Pattern 1: By Business Capability
```python
# E-commerce example
# Order Service
class OrderService:
"""Handles order lifecycle."""
async def create_order(self, order_data: dict) -> Order:
order = Order.create(order_data)
# Publish event for other services
await self.event_bus.publish(
OrderCreatedEvent(
order_id=order.id,
customer_id=order.customer_id,
items=order.items,
total=order.total
)
)
return order
# Payment Service (separate service)
class PaymentService:
"""Handles payment processing."""
async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult:
# Process payment
result = await self.payment_gateway.charge(
amount=payment_request.amount,
customer=payment_request.customer_id
)
if result.success:
await self.event_bus.publish(
PaymentCompletedEvent(
order_id=payment_request.order_id,
transaction_id=result.transaction_id
)
)
return result
# Inventory Service (separate service)
class InventoryService:
"""Handles inventory management."""
async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult:
# Check availability
for item in items:
available = await self.inventory_repo.get_available(item.product_id)
if available < item.quantity:
return ReservationResult(
success=False,
error=f"Insufficient inventory for {item.product_id}"
)
# Reserve items
reservation = await self.create_reservation(order_id, items)
await self.event_bus.publish(
InventoryReservedEvent(
order_id=order_id,
reservation_id=reservation.id
)
)
return ReservationResult(success=True, reservation=reservation)
```
### Pattern 2: API Gateway
```python
from fastapi import FastAPI, HTTPException, Depends
import httpx
from circuitbreaker import circuit
app = FastAPI()
class APIGateway:
"""Central entry point for all client requests."""
def __init__(self):
self.order_service_url = "http://order-service:8000"
self.payment_service_url = "http://payment-service:8001"
self.inventory_service_url = "http://inventory-service:8002"
self.http_client = httpx.AsyncClient(timeout=5.0)
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_order_service(self, path: str, method: str = "GET", **kwargs):
"""Call order service with circuit breaker."""
response = await self.http_client.request(
method,
f"{self.order_service_url}{path}",
**kwargs
)
response.raise_for_status()
return response.json()
async def create_order_aggregate(self, order_id: str) -> dict:
"""Aggregate data from multiple services."""
# Parallel requests
order, payment, inventory = await asyncio.gather(
self.call_order_service(f"/orders/{order_id}"),
self.call_payment_service(f"/payments/order/{order_id}"),
self.call_inventory_service(f"/reservations/order/{order_id}"),
return_exceptions=True
)
# Handle partial failures
result = {"order": order}
if not isinstance(payment, Exception):
result["payment"] = payment
if not isinstance(inventory, Exception):
result["inventory"] = inventory
return result
@app.post("/api/orders")
async def create_order(
order_data: dict,
gateway: APIGateway = Depends()
):
"""API Gateway endpoint."""
try:
# Route to order service
order = await gateway.call_order_service(
"/orders",
method="POST",
json=order_data
)
return {"order": order}
except httpx.HTTPError as e:
raise HTTPException(status_code=503, detail="Order service unavailable")
```
## Communication Patterns
### Pattern 1: Synchronous REST Communication
```python
# Service A calls Service B
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
class ServiceClient:
"""HTTP client with retries and timeout."""
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(5.0, connect=2.0),
limits=httpx.Limits(max_keepalive_connections=20)
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def get(self, path: str, **kwargs):
"""GET with automatic retries."""
response = await self.client.get(f"{self.base_url}{path}", **kwargs)
response.raise_for_status()
return response.json()
async def post(self, path: str, **kwargs):
"""POST request."""
response = await self.client.post(f"{self.base_url}{path}", **kwargs)
response.raise_for_status()
return response.json()
# Usage
payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.post("/payments", json=payment_data)
```
### Pattern 2: Asynchronous Event-Driven
```python
# Event-driven communication with Kafka
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class DomainEvent:
event_id: str
event_type: str
aggregate_id: str
occurred_at: datetime
data: dict
class EventBus:
"""Event publishing and subscription."""
def __init__(self, bootstrap_servers: List[str]):
self.bootstrap_servers = bootstrap_servers
self.producer = None
async def start(self):
self.producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode()
)
await self.producer.start()
async def publish(self, event: DomainEvent):
"""Publish event to Kafka topic."""
topic = event.event_type
await self.producer.send_and_wait(
topic,
value=asdict(event),
key=event.aggregate_id.encode()
)
async def subscribe(self, topic: str, handler: callable):
"""Subscribe to events."""
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode()),
group_id="my-service"
)
await consumer.start()
try:
async for message in consumer:
event_data = message.value
await handler(event_data)
finally:
await consumer.stop()
# Order Service publishes event
async def create_order(order_data: dict):
order = await save_order(order_data)
event = DomainEvent(
event_id=str(uuid.uuid4()),
event_type="OrderCreated",
aggregate_id=order.id,
occurred_at=datetime.now(),
data={
"order_id": order.id,
"customer_id": order.customer_id,
"total": order.total
}
)
await event_bus.publish(event)
# Inventory Service listens for OrderCreated
async def handle_order_created(event_data: dict):
"""React to order creation."""
order_id = event_data["data"]["order_id"]
items = event_data["data"]["items"]
# Reserve inventory
await reserve_inventory(order_id, items)
```
### Pattern 3: Saga Pattern (Distributed Transactions)
```python
# Saga orchestration for order fulfillment
from enum import Enum
from typing import List, Callable
class SagaStep:
"""Single step in saga."""
def __init__(
self,
name: str,
action: Callable,
compensation: Callable
):
self.name = name
self.action = action
self.compensation = compensation
class SagaStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
class OrderFulfillmentSaga:
"""Orchestrated saga for order fulfillment."""
def __init__(self):
self.steps: List[SagaStep] = [
SagaStep(
"create_order",
action=self.create_order,
compensation=self.cancel_order
),
SagaStep(
"reserve_inventory",
action=self.reserve_inventory,
compensation=self.release_inventory
),
SagaStep(
"process_payment",
action=self.process_payment,
compensation=self.refund_payment
),
SagaStep(
"confirm_order",
action=self.confirm_order,
compensation=self.cancel_order_confirmation
)
]
async def execute(self, order_data: dict) -> SagaResult:
"""Execute saga steps."""
completed_steps = []
context = {"order_data": order_data}
try:
for step in self.steps:
# Execute step
result = await step.action(context)
if not result.success:
# Compensate
await self.compensate(completed_steps, context)
return SagaResult(
status=SagaStatus.FAILED,
error=result.error
)
completed_steps.append(step)
context.update(result.data)
return SagaResult(status=SagaStatus.COMPLETED, data=context)
except Exception as e:
# Compensate on error
await self.compensate(completed_steps, context)
return SagaResult(status=SagaStatus.FAILED, error=str(e))
async def compensate(self, completed_steps: List[SagaStep], context: dict):
"""Execute compensating actions in reverse order."""
for step in reversed(completed_steps):
try:
await step.compensation(context)
except Exception as e:
# Log compensation failure
print(f"Compensation failed for {step.name}: {e}")
# Step implementations
async def create_order(self, context: dict) -> StepResult:
order = await order_service.create(context["order_data"])
return StepResult(success=True, data={"order_id": order.id})
async def cancel_order(self, context: dict):
await order_service.cancel(context["order_id"])
async def reserve_inventory(self, context: dict) -> StepResult:
result = await inventory_service.reserve(
context["order_id"],
context["order_data"]["items"]
)
return StepResult(
success=result.success,
data={"reservation_id": result.reservation_id}
)
async def release_inventory(self, context: dict):
await inventory_service.release(context["reservation_id"])
async def process_payment(self, context: dict) -> StepResult:
result = await payment_service.charge(
context["order_id"],
context["order_data"]["total"]
)
return StepResult(
success=result.success,
data={"transaction_id": result.transaction_id},
error=result.error
)
async def refund_payment(self, context: dict):
await payment_service.refund(context["transaction_id"])
```
## Resilience Patterns
### Circuit Breaker Pattern
```python
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
"""Circuit breaker for service calls."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker."""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful call."""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
def _on_failure(self):
"""Handle failed call."""
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
def _should_attempt_reset(self) -> bool:
"""Check if enough time passed to try again."""
return (
datetime.now() - self.opened_at
> timedelta(seconds=self.recovery_timeout)
)
# Usage
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
async def call_payment_service(payment_data: dict):
return await breaker.call(
payment_client.process_payment,
payment_data
)
```
## Resources
- **references/service-decomposition-guide.md**: Breaking down monoliths
- **references/communication-patterns.md**: Sync vs async patterns
- **references/saga-implementation.md**: Distributed transactions
- **assets/circuit-breaker.py**: Production circuit breaker
- **assets/event-bus-template.py**: Kafka event bus implementation
- **assets/api-gateway-template.py**: Complete API gateway
## Best Practices
1. **Service Boundaries**: Align with business capabilities
2. **Database Per Service**: No shared databases
3. **API Contracts**: Versioned, backward compatible
4. **Async When Possible**: Events over direct calls
5. **Circuit Breakers**: Fail fast on service failures
6. **Distributed Tracing**: Track requests across services
7. **Service Registry**: Dynamic service discovery
8. **Health Checks**: Liveness and readiness probes
## Common Pitfalls
- **Distributed Monolith**: Tightly coupled services
- **Chatty Services**: Too many inter-service calls
- **Shared Databases**: Tight coupling through data
- **No Circuit Breakers**: Cascade failures
- **Synchronous Everything**: Tight coupling, poor resilience
- **Premature Microservices**: Starting with microservices
- **Ignoring Network Failures**: Assuming reliable network
- **No Compensation Logic**: Can't undo failed transactions