16 KiB
Database Operations Pattern
Use Case: Read, write, sync, and manage database data in workflows.
Pattern Structure
Trigger → [Query/Read] → [Transform] → [Write/Update] → [Verify/Log]
Key Characteristic: Data persistence and synchronization
Core Components
1. Trigger
Options:
- Schedule - Periodic sync/maintenance (most common)
- Webhook - Event-driven writes
- Manual - One-time operations
2. Database Read Nodes
Supported databases:
- Postgres
- MySQL
- MongoDB
- Microsoft SQL
- SQLite
- Redis
- And more via community nodes
3. Transform
Purpose: Map between different database schemas or formats
Typical nodes:
- Set - Field mapping
- Code - Complex transformations
- Merge - Combine data from multiple sources
4. Database Write Nodes
Operations:
- INSERT - Create new records
- UPDATE - Modify existing records
- UPSERT - Insert or update
- DELETE - Remove records
5. Verification
Purpose: Confirm operations succeeded
Methods:
- Query to verify records
- Count rows affected
- Log results
Common Use Cases
1. Data Synchronization
Flow: Schedule → Read Source DB → Transform → Write Target DB → Log
Example (Postgres to MySQL sync):
1. Schedule (every 15 minutes)
2. Postgres (SELECT * FROM users WHERE updated_at > {{$json.last_sync}})
3. IF (check if records exist)
4. Set (map Postgres schema to MySQL schema)
5. MySQL (INSERT or UPDATE users)
6. Postgres (UPDATE sync_log SET last_sync = NOW())
7. Slack (notify: "Synced X users")
Incremental sync query:
SELECT *
FROM users
WHERE updated_at > $1
ORDER BY updated_at ASC
LIMIT 1000
Parameters:
{
"parameters": [
"={{$node['Get Last Sync'].json.last_sync}}"
]
}
2. ETL (Extract, Transform, Load)
Flow: Extract from multiple sources → Transform → Load into warehouse
Example (Consolidate data):
1. Schedule (daily at 2 AM)
2. [Parallel branches]
├─ Postgres (SELECT orders)
├─ MySQL (SELECT customers)
└─ MongoDB (SELECT products)
3. Merge (combine all data)
4. Code (transform to warehouse schema)
5. Postgres (warehouse - INSERT into fact_sales)
6. Email (send summary report)
3. Data Validation & Cleanup
Flow: Schedule → Query → Validate → Update/Delete invalid records
Example (Clean orphaned records):
1. Schedule (weekly)
2. Postgres (SELECT users WHERE email IS NULL OR email = '')
3. IF (invalid records exist)
4. Postgres (UPDATE users SET status='inactive' WHERE email IS NULL)
5. Postgres (DELETE FROM users WHERE created_at < NOW() - INTERVAL '1 year' AND status='inactive')
6. Slack (alert: "Cleaned X invalid records")
4. Backup & Archive
Flow: Schedule → Query → Export → Store
Example (Archive old records):
1. Schedule (monthly)
2. Postgres (SELECT * FROM orders WHERE created_at < NOW() - INTERVAL '2 years')
3. Code (convert to JSON)
4. Write File (save to archive.json)
5. Google Drive (upload archive)
6. Postgres (DELETE FROM orders WHERE created_at < NOW() - INTERVAL '2 years')
5. Real-time Data Updates
Flow: Webhook → Parse → Update Database
Example (Update user status):
1. Webhook (receive status update)
2. Postgres (UPDATE users SET status = {{$json.body.status}} WHERE id = {{$json.body.user_id}})
3. IF (rows affected > 0)
4. Redis (SET user:{{$json.body.user_id}}:status {{$json.body.status}})
5. Webhook Response ({"success": true})
Database Node Configuration
Postgres
SELECT Query
{
operation: "executeQuery",
query: "SELECT id, name, email FROM users WHERE created_at > $1 LIMIT $2",
parameters: [
"={{$json.since_date}}",
"100"
]
}
INSERT
{
operation: "insert",
table: "users",
columns: "id, name, email, created_at",
values: [
{
id: "={{$json.id}}",
name: "={{$json.name}}",
email: "={{$json.email}}",
created_at: "={{$now}}"
}
]
}
UPDATE
{
operation: "update",
table: "users",
updateKey: "id",
columns: "name, email, updated_at",
values: {
id: "={{$json.id}}",
name: "={{$json.name}}",
email: "={{$json.email}}",
updated_at: "={{$now}}"
}
}
UPSERT (INSERT ... ON CONFLICT)
{
operation: "executeQuery",
query: `
INSERT INTO users (id, name, email)
VALUES ($1, $2, $3)
ON CONFLICT (id)
DO UPDATE SET name = $2, email = $3, updated_at = NOW()
`,
parameters: [
"={{$json.id}}",
"={{$json.name}}",
"={{$json.email}}"
]
}
MySQL
SELECT with JOIN
{
operation: "executeQuery",
query: `
SELECT u.id, u.name, o.order_id, o.total
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > ?
`,
parameters: [
"={{$json.since_date}}"
]
}
Bulk INSERT
{
operation: "insert",
table: "orders",
columns: "user_id, total, status",
values: $json.orders // Array of objects
}
MongoDB
Find Documents
{
operation: "find",
collection: "users",
query: JSON.stringify({
created_at: { $gt: new Date($json.since_date) },
status: "active"
}),
limit: 100
}
Insert Document
{
operation: "insert",
collection: "users",
document: JSON.stringify({
name: $json.name,
email: $json.email,
created_at: new Date()
})
}
Update Document
{
operation: "update",
collection: "users",
query: JSON.stringify({ _id: $json.user_id }),
update: JSON.stringify({
$set: {
status: $json.status,
updated_at: new Date()
}
})
}
Batch Processing
Pattern 1: Split In Batches
Use when: Processing large datasets to avoid memory issues
Postgres (SELECT 10000 records)
→ Split In Batches (100 items per batch)
→ Transform
→ MySQL (write batch)
→ Loop (until all processed)
Pattern 2: Paginated Queries
Use when: Database has millions of records
Set (initialize: offset=0, limit=1000)
→ Loop Start
→ Postgres (SELECT * FROM large_table LIMIT {{$json.limit}} OFFSET {{$json.offset}})
→ IF (records returned)
├─ Process records
├─ Set (increment offset by 1000)
└─ Loop back
└─ [No records] → End
Query:
SELECT * FROM large_table
ORDER BY id
LIMIT $1 OFFSET $2
Pattern 3: Cursor-Based Pagination
Better performance for large datasets:
Set (initialize: last_id=0)
→ Loop Start
→ Postgres (SELECT * FROM table WHERE id > {{$json.last_id}} ORDER BY id LIMIT 1000)
→ IF (records returned)
├─ Process records
├─ Code (get max id from batch)
└─ Loop back
└─ [No records] → End
Query:
SELECT * FROM table
WHERE id > $1
ORDER BY id ASC
LIMIT 1000
Transaction Handling
Pattern 1: BEGIN/COMMIT/ROLLBACK
For databases that support transactions:
// Node 1: Begin Transaction
{
operation: "executeQuery",
query: "BEGIN"
}
// Node 2-N: Your operations
{
operation: "executeQuery",
query: "INSERT INTO ...",
continueOnFail: true
}
// Node N+1: Commit or Rollback
{
operation: "executeQuery",
query: "={{$node['Operation'].json.error ? 'ROLLBACK' : 'COMMIT'}}"
}
Pattern 2: Atomic Operations
Use database features for atomicity:
-- Upsert example (atomic)
INSERT INTO inventory (product_id, quantity)
VALUES ($1, $2)
ON CONFLICT (product_id)
DO UPDATE SET quantity = inventory.quantity + $2
Pattern 3: Error Rollback
Manual rollback on error:
Try Operations:
Postgres (INSERT orders)
MySQL (INSERT order_items)
Error Trigger:
Postgres (DELETE FROM orders WHERE id = {{$json.order_id}})
MySQL (DELETE FROM order_items WHERE order_id = {{$json.order_id}})
Data Transformation
Schema Mapping
// Code node - map schemas
const sourceData = $input.all();
return sourceData.map(item => ({
json: {
// Source → Target mapping
user_id: item.json.id,
full_name: `${item.json.first_name} ${item.json.last_name}`,
email_address: item.json.email,
registration_date: new Date(item.json.created_at).toISOString(),
// Computed fields
is_premium: item.json.plan_type === 'pro',
// Default values
status: item.json.status || 'active'
}
}));
Data Type Conversions
// Code node - convert data types
return $input.all().map(item => ({
json: {
// String to number
user_id: parseInt(item.json.user_id),
// String to date
created_at: new Date(item.json.created_at),
// Number to boolean
is_active: item.json.active === 1,
// JSON string to object
metadata: JSON.parse(item.json.metadata || '{}'),
// Null handling
email: item.json.email || null
}
}));
Aggregation
// Code node - aggregate data
const items = $input.all();
const summary = items.reduce((acc, item) => {
const date = item.json.created_at.split('T')[0];
if (!acc[date]) {
acc[date] = { count: 0, total: 0 };
}
acc[date].count++;
acc[date].total += item.json.amount;
return acc;
}, {});
return Object.entries(summary).map(([date, data]) => ({
json: {
date,
count: data.count,
total: data.total,
average: data.total / data.count
}
}));
Performance Optimization
1. Use Indexes
Ensure database has proper indexes:
-- Add index for sync queries
CREATE INDEX idx_users_updated_at ON users(updated_at);
-- Add index for lookups
CREATE INDEX idx_orders_user_id ON orders(user_id);
2. Limit Result Sets
Always use LIMIT:
-- ✅ Good
SELECT * FROM large_table
WHERE created_at > $1
LIMIT 1000
-- ❌ Bad (unbounded)
SELECT * FROM large_table
WHERE created_at > $1
3. Use Prepared Statements
Parameterized queries are faster:
// ✅ Good - prepared statement
{
query: "SELECT * FROM users WHERE id = $1",
parameters: ["={{$json.id}}"]
}
// ❌ Bad - string concatenation
{
query: "SELECT * FROM users WHERE id = '={{$json.id}}'"
}
4. Batch Writes
Write multiple records at once:
// ✅ Good - batch insert
{
operation: "insert",
table: "orders",
values: $json.items // Array of 100 items
}
// ❌ Bad - individual inserts in loop
// 100 separate INSERT statements
5. Connection Pooling
Configure in credentials:
{
host: "db.example.com",
database: "mydb",
user: "user",
password: "pass",
// Connection pool settings
min: 2,
max: 10,
idleTimeoutMillis: 30000
}
Error Handling
Pattern 1: Check Rows Affected
Database Operation (UPDATE users...)
→ IF ({{$json.rowsAffected === 0}})
└─ Alert: "No rows updated - record not found"
Pattern 2: Constraint Violations
// Database operation with continueOnFail: true
{
operation: "insert",
continueOnFail: true
}
// Next node: Check for errors
IF ({{$json.error !== undefined}})
→ IF ({{$json.error.includes('duplicate key')}})
└─ Log: "Record already exists - skipping"
→ ELSE
└─ Alert: "Database error: {{$json.error}}"
Pattern 3: Rollback on Error
Try Operations:
→ Database Write 1
→ Database Write 2
→ Database Write 3
Error Trigger:
→ Rollback Operations
→ Alert Admin
Security Best Practices
1. Use Parameterized Queries (Prevent SQL Injection)
// ✅ SAFE - parameterized
{
query: "SELECT * FROM users WHERE email = $1",
parameters: ["={{$json.email}}"]
}
// ❌ DANGEROUS - SQL injection risk
{
query: "SELECT * FROM users WHERE email = '={{$json.email}}'"
}
2. Least Privilege Access
Create dedicated workflow user:
-- ✅ Good - limited permissions
CREATE USER n8n_workflow WITH PASSWORD 'secure_password';
GRANT SELECT, INSERT, UPDATE ON orders TO n8n_workflow;
GRANT SELECT ON users TO n8n_workflow;
-- ❌ Bad - too much access
GRANT ALL PRIVILEGES TO n8n_workflow;
3. Validate Input Data
// Code node - validate before write
const email = $json.email;
const name = $json.name;
// Validation
if (!email || !email.includes('@')) {
throw new Error('Invalid email address');
}
if (!name || name.length < 2) {
throw new Error('Invalid name');
}
// Sanitization
return [{
json: {
email: email.toLowerCase().trim(),
name: name.trim()
}
}];
4. Encrypt Sensitive Data
// Code node - encrypt before storage
const crypto = require('crypto');
const algorithm = 'aes-256-cbc';
const key = Buffer.from($credentials.encryptionKey, 'hex');
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(algorithm, key, iv);
let encrypted = cipher.update($json.sensitive_data, 'utf8', 'hex');
encrypted += cipher.final('hex');
return [{
json: {
encrypted_data: encrypted,
iv: iv.toString('hex')
}
}];
Common Gotchas
1. ❌ Wrong: Unbounded queries
SELECT * FROM large_table -- Could return millions
✅ Correct: Use LIMIT
SELECT * FROM large_table
ORDER BY created_at DESC
LIMIT 1000
2. ❌ Wrong: String concatenation in queries
query: "SELECT * FROM users WHERE id = '{{$json.id}}'"
✅ Correct: Parameterized queries
query: "SELECT * FROM users WHERE id = $1",
parameters: ["={{$json.id}}"]
3. ❌ Wrong: No transaction for multi-step operations
INSERT into orders
INSERT into order_items // Fails → orphaned order record
✅ Correct: Use transaction
BEGIN
INSERT into orders
INSERT into order_items
COMMIT (or ROLLBACK on error)
4. ❌ Wrong: Processing all items at once
SELECT 1000000 records → Process all → OOM error
✅ Correct: Batch processing
SELECT records → Split In Batches (1000) → Process → Loop
Real Template Examples
From n8n template library (456 database templates):
Data Sync:
Schedule → Postgres (SELECT new records) → Transform → MySQL (INSERT)
ETL Pipeline:
Schedule → [Multiple DB reads] → Merge → Transform → Warehouse (INSERT)
Backup:
Schedule → Postgres (SELECT all) → JSON → Google Drive (upload)
Use search_templates({query: "database"}) to find more!
Checklist for Database Workflows
Planning
- Identify source and target databases
- Understand schema differences
- Plan transformation logic
- Consider batch size for large datasets
- Design error handling strategy
Implementation
- Use parameterized queries (never concatenate)
- Add LIMIT to all SELECT queries
- Use appropriate operation (INSERT/UPDATE/UPSERT)
- Configure credentials properly
- Test with small dataset first
Performance
- Add database indexes for queries
- Use batch operations
- Implement pagination for large datasets
- Configure connection pooling
- Monitor query execution times
Security
- Use parameterized queries (SQL injection prevention)
- Least privilege database user
- Validate and sanitize input
- Encrypt sensitive data
- Never log sensitive data
Reliability
- Add transaction handling if needed
- Check rows affected
- Handle constraint violations
- Implement retry logic
- Add Error Trigger workflow
Summary
Key Points:
- Always use parameterized queries (prevent SQL injection)
- Batch processing for large datasets
- Transaction handling for multi-step operations
- Limit result sets to avoid memory issues
- Validate input data before writes
Pattern: Trigger → Query → Transform → Write → Verify
Related:
- http_api_integration.md - Fetching data to store in DB
- scheduled_tasks.md - Periodic database maintenance