Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 08:24:26 +08:00
commit ce4251a69d
14 changed files with 3532 additions and 0 deletions

View File

@@ -0,0 +1,127 @@
/**
* Basic Queue Consumer (Implicit Acknowledgement)
*
* Use when: Operations are idempotent (safe to retry)
* Examples: Logging, sending emails (idempotent with deduplication), metrics
*
* How it works:
* - If handler returns successfully → all messages acknowledged
* - If handler throws error → entire batch retried
*
* Setup:
* 1. Create queue: npx wrangler queues create my-queue
* 2. Add consumer binding to wrangler.jsonc (see wrangler-queues-config.jsonc)
* 3. Deploy: npm run deploy
*/
type Env = {
// Add your bindings here (D1, KV, R2, etc.)
LOGS: KVNamespace;
};
export default {
async queue(
batch: MessageBatch,
env: Env,
ctx: ExecutionContext
): Promise<void> {
console.log(`Processing batch of ${batch.messages.length} messages from queue: ${batch.queue}`);
// Process each message
for (const message of batch.messages) {
console.log(`Message ID: ${message.id}`);
console.log(`Attempt: ${message.attempts}`);
console.log(`Timestamp: ${message.timestamp}`);
console.log(`Body:`, message.body);
// Your processing logic
await processMessage(message.body, env);
}
// Implicit acknowledgement:
// Returning successfully acknowledges ALL messages
// If this function throws, ALL messages will be retried
},
};
/**
* Process individual message
*/
async function processMessage(body: any, env: Env) {
switch (body.type) {
case 'send-email':
await sendEmail(body);
break;
case 'log-event':
await logEvent(body, env);
break;
case 'update-metrics':
await updateMetrics(body, env);
break;
default:
console.warn(`Unknown message type: ${body.type}`);
}
}
/**
* Example: Send email (idempotent with external deduplication)
*/
async function sendEmail(data: any) {
console.log(`Sending email to ${data.to}`);
// Call email API (e.g., Resend, SendGrid)
const response = await fetch('https://api.resend.com/emails', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.RESEND_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
from: 'noreply@example.com',
to: data.to,
subject: data.subject,
html: data.html,
}),
});
if (!response.ok) {
throw new Error(`Failed to send email: ${response.statusText}`);
}
console.log(`Email sent successfully to ${data.to}`);
}
/**
* Example: Log event to KV (idempotent)
*/
async function logEvent(data: any, env: Env) {
const logKey = `log:${data.eventId}:${Date.now()}`;
await env.LOGS.put(logKey, JSON.stringify({
event: data.event,
userId: data.userId,
timestamp: new Date().toISOString(),
metadata: data.metadata,
}), {
expirationTtl: 86400 * 30, // 30 days
});
console.log(`Event logged: ${logKey}`);
}
/**
* Example: Update metrics (idempotent aggregation)
*/
async function updateMetrics(data: any, env: Env) {
// Increment counter in KV
const key = `metric:${data.metric}`;
const current = await env.LOGS.get(key);
const count = current ? parseInt(current) : 0;
await env.LOGS.put(key, String(count + 1));
console.log(`Metric ${data.metric} updated: ${count + 1}`);
}

View File

@@ -0,0 +1,157 @@
/**
* Queue Consumer with Explicit Acknowledgement
*
* Use when: Operations are non-idempotent (NOT safe to retry)
* Examples: Database writes, payment processing, API calls with side effects
*
* How it works:
* - Call message.ack() after successful processing
* - Only acknowledged messages are removed from queue
* - Failed messages can retry independently
*
* Setup:
* 1. Create queue: npx wrangler queues create my-queue
* 2. Create DLQ: npx wrangler queues create my-dlq
* 3. Add consumer binding with DLQ (see wrangler-queues-config.jsonc)
* 4. Deploy: npm run deploy
*/
type Env = {
DB: D1Database;
API_KEY: string;
};
export default {
async queue(
batch: MessageBatch,
env: Env,
ctx: ExecutionContext
): Promise<void> {
console.log(`Processing batch of ${batch.messages.length} messages`);
// Process each message individually
for (const message of batch.messages) {
try {
// Non-idempotent operation
await processNonIdempotent(message.body, env);
// CRITICAL: Explicitly acknowledge success
message.ack();
console.log(`✅ Message ${message.id} processed successfully`);
} catch (error) {
console.error(`❌ Failed to process message ${message.id}:`, error);
// Don't call ack() - message will retry
// After max_retries, sent to DLQ (if configured)
}
}
// Note: We DON'T throw an error here
// Only unacknowledged messages will retry
},
};
/**
* Process message with non-idempotent operations
*/
async function processNonIdempotent(body: any, env: Env) {
switch (body.type) {
case 'create-order':
await createOrder(body, env);
break;
case 'charge-payment':
await chargePayment(body, env);
break;
case 'update-inventory':
await updateInventory(body, env);
break;
default:
throw new Error(`Unknown message type: ${body.type}`);
}
}
/**
* Example: Create database record (non-idempotent)
*/
async function createOrder(data: any, env: Env) {
// Insert into database (can't be retried safely)
const result = await env.DB.prepare(`
INSERT INTO orders (id, user_id, total, status, created_at)
VALUES (?, ?, ?, ?, ?)
`)
.bind(
data.orderId,
data.userId,
data.total,
'pending',
new Date().toISOString()
)
.run();
if (!result.success) {
throw new Error(`Failed to create order: ${data.orderId}`);
}
console.log(`Order created: ${data.orderId}`);
}
/**
* Example: Charge payment (non-idempotent)
*/
async function chargePayment(data: any, env: Env) {
// Call payment API (can't be retried safely without deduplication)
const response = await fetch('https://api.stripe.com/v1/charges', {
method: 'POST',
headers: {
'Authorization': `Bearer ${env.API_KEY}`,
'Content-Type': 'application/x-www-form-urlencoded',
},
body: new URLSearchParams({
amount: String(data.amount),
currency: data.currency,
source: data.token,
description: data.description,
}),
});
if (!response.ok) {
const error = await response.text();
throw new Error(`Payment failed: ${error}`);
}
const charge = await response.json();
console.log(`Payment charged: ${charge.id}`);
// Update database with charge ID
await env.DB.prepare(`
UPDATE orders SET payment_id = ?, status = 'paid' WHERE id = ?
`)
.bind(charge.id, data.orderId)
.run();
}
/**
* Example: Update inventory (non-idempotent)
*/
async function updateInventory(data: any, env: Env) {
for (const item of data.items) {
// Decrement inventory count
const result = await env.DB.prepare(`
UPDATE inventory
SET quantity = quantity - ?
WHERE sku = ? AND quantity >= ?
`)
.bind(item.quantity, item.sku, item.quantity)
.run();
if (result.changes === 0) {
throw new Error(`Insufficient inventory for SKU: ${item.sku}`);
}
console.log(`Inventory updated: ${item.sku} (-${item.quantity})`);
}
}

View File

@@ -0,0 +1,215 @@
/**
* Dead Letter Queue (DLQ) Consumer
*
* Handles messages that failed after max retries in the main queue.
*
* Use when: You need to:
* - Log permanently failed messages
* - Alert ops team about failures
* - Store failed messages for manual review
* - Implement custom retry logic
*
* Setup:
* 1. Create DLQ: npx wrangler queues create my-dlq
* 2. Configure main queue consumer with DLQ:
* "dead_letter_queue": "my-dlq" in wrangler.jsonc
* 3. Create consumer for DLQ (this file)
* 4. Deploy both consumers
*/
type Env = {
DB: D1Database;
ALERTS: KVNamespace;
MAIN_QUEUE: Queue; // Reference to main queue for retry
};
export default {
async queue(
batch: MessageBatch,
env: Env,
ctx: ExecutionContext
): Promise<void> {
console.log(`⚠️ Processing ${batch.messages.length} FAILED messages from DLQ`);
for (const message of batch.messages) {
try {
await handleFailedMessage(message, env);
// Acknowledge to remove from DLQ
message.ack();
} catch (error) {
console.error(`Failed to process DLQ message ${message.id}:`, error);
// Don't ack - will retry in DLQ
}
}
},
};
/**
* Handle permanently failed message
*/
async function handleFailedMessage(message: Message, env: Env) {
console.log(`💀 Dead Letter Message:`);
console.log(` ID: ${message.id}`);
console.log(` Attempts: ${message.attempts}`);
console.log(` Original Timestamp: ${message.timestamp}`);
console.log(` Body:`, message.body);
// 1. Store in database for manual review
await storeFailed Message(message, env);
// 2. Send alert to ops team
await sendAlert(message, env);
// 3. Optional: Implement custom retry logic
if (shouldRetryInMainQueue(message)) {
await retryInMainQueue(message, env);
}
}
/**
* Store failed message in database
*/
async function storeFailedMessage(message: Message, env: Env) {
await env.DB.prepare(`
INSERT INTO failed_messages (
id,
queue_name,
body,
attempts,
original_timestamp,
failed_at,
error_details
) VALUES (?, ?, ?, ?, ?, ?, ?)
`)
.bind(
message.id,
'my-queue', // Or get from message.body if you include it
JSON.stringify(message.body),
message.attempts,
message.timestamp.toISOString(),
new Date().toISOString(),
JSON.stringify({
reason: 'Max retries exceeded',
lastAttempt: message.attempts,
})
)
.run();
console.log(`Stored failed message in database: ${message.id}`);
}
/**
* Send alert to ops team
*/
async function sendAlert(message: Message, env: Env) {
// Send to monitoring service (e.g., PagerDuty, Slack, Email)
const alert = {
severity: 'high',
title: 'Queue Message Permanently Failed',
description: `Message ${message.id} failed after ${message.attempts} attempts`,
details: {
messageId: message.id,
attempts: message.attempts,
body: message.body,
timestamp: message.timestamp.toISOString(),
},
};
// Example: Store in KV for alert aggregation
const alertKey = `alert:${message.id}`;
await env.ALERTS.put(alertKey, JSON.stringify(alert), {
expirationTtl: 86400 * 7, // 7 days
});
// Example: Send webhook to Slack
await fetch(process.env.SLACK_WEBHOOK_URL || '', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
text: `🚨 Queue DLQ Alert: Message ${message.id} failed permanently`,
blocks: [
{
type: 'section',
text: {
type: 'mrkdwn',
text: `*Message ID:* ${message.id}\n*Attempts:* ${message.attempts}\n*Type:* ${message.body.type}`,
},
},
],
}),
});
console.log(`Alert sent for message: ${message.id}`);
}
/**
* Determine if message should be retried in main queue
* (e.g., after fixing a bug, or for specific message types)
*/
function shouldRetryInMainQueue(message: Message): boolean {
// Example: Retry if it's a payment message and attempts < 10
if (message.body.type === 'charge-payment' && message.attempts < 10) {
return true;
}
// Example: Retry if it failed due to a specific error
if (message.body.retryable === true) {
return true;
}
return false;
}
/**
* Retry message in main queue (with delay)
*/
async function retryInMainQueue(message: Message, env: Env) {
console.log(`Retrying message ${message.id} in main queue`);
// Send back to main queue with exponential delay
const delaySeconds = Math.min(
3600 * Math.pow(2, message.attempts - 3), // Start from where DLQ picked up
43200 // Max 12 hours
);
await env.MAIN_QUEUE.send(
{
...message.body,
retriedFromDLQ: true,
originalMessageId: message.id,
dlqAttempts: message.attempts,
},
{ delaySeconds }
);
console.log(`Message ${message.id} re-queued with ${delaySeconds}s delay`);
}
/**
* Manual retry endpoint (call via REST API or dashboard)
*/
export async function manualRetry(messageId: string, env: Env) {
// Fetch failed message from database
const result = await env.DB.prepare(
'SELECT * FROM failed_messages WHERE id = ?'
)
.bind(messageId)
.first();
if (!result) {
throw new Error(`Message ${messageId} not found in DLQ`);
}
const body = JSON.parse(result.body as string);
// Send back to main queue
await env.MAIN_QUEUE.send({
...body,
manualRetry: true,
retriedBy: 'admin',
retriedAt: new Date().toISOString(),
});
console.log(`Manually retried message: ${messageId}`);
}

View File

@@ -0,0 +1,243 @@
/**
* Queue Producer Example
*
* Shows how to send messages to Cloudflare Queues from a Hono Worker.
*
* Setup:
* 1. Create queue: npx wrangler queues create my-queue
* 2. Add producer binding to wrangler.jsonc (see wrangler-queues-config.jsonc)
* 3. Deploy: npm run deploy
*/
import { Hono } from 'hono';
type Bindings = {
MY_QUEUE: Queue;
};
const app = new Hono<{ Bindings: Bindings }>();
// ============================================================================
// Send Single Message
// ============================================================================
app.post('/send', async (c) => {
const body = await c.req.json();
// Simple send
await c.env.MY_QUEUE.send({
type: 'process-order',
orderId: body.orderId,
userId: body.userId,
timestamp: Date.now(),
});
return c.json({ status: 'queued' });
});
// ============================================================================
// Send Message with Delay
// ============================================================================
app.post('/send-delayed', async (c) => {
const { task, delayMinutes } = await c.req.json();
// Delay message delivery
await c.env.MY_QUEUE.send(
{
type: 'scheduled-task',
task,
scheduledFor: Date.now() + (delayMinutes * 60 * 1000),
},
{
delaySeconds: delayMinutes * 60, // Convert minutes to seconds
}
);
return c.json({
status: 'scheduled',
delayMinutes,
processAt: new Date(Date.now() + (delayMinutes * 60 * 1000)).toISOString(),
});
});
// ============================================================================
// Send Batch of Messages
// ============================================================================
app.post('/send-batch', async (c) => {
const items = await c.req.json<Array<any>>();
// Validate batch size (max 100 messages)
if (items.length > 100) {
return c.json(
{ error: 'Maximum 100 messages per batch' },
400
);
}
// Send batch
await c.env.MY_QUEUE.sendBatch(
items.map((item) => ({
body: {
type: 'batch-process',
itemId: item.id,
data: item.data,
},
}))
);
return c.json({
status: 'queued',
count: items.length,
});
});
// ============================================================================
// Send Batch with Individual Delays
// ============================================================================
app.post('/send-scheduled-batch', async (c) => {
const tasks = await c.req.json<Array<{ task: string; delayMinutes: number }>>();
await c.env.MY_QUEUE.sendBatch(
tasks.map((task) => ({
body: {
type: 'scheduled-task',
task: task.task,
},
delaySeconds: task.delayMinutes * 60,
}))
);
return c.json({
status: 'scheduled',
count: tasks.length,
});
});
// ============================================================================
// Validate Message Size (< 128 KB)
// ============================================================================
app.post('/send-validated', async (c) => {
const body = await c.req.json();
// Check message size
const messageSize = new TextEncoder().encode(JSON.stringify(body)).length;
const MAX_SIZE = 128 * 1024; // 128 KB
if (messageSize > MAX_SIZE) {
return c.json(
{
error: 'Message too large',
size: messageSize,
maxSize: MAX_SIZE,
},
400
);
}
await c.env.MY_QUEUE.send(body);
return c.json({
status: 'queued',
messageSize,
});
});
// ============================================================================
// Real-World Example: E-commerce Order Processing
// ============================================================================
interface Order {
orderId: string;
userId: string;
items: Array<{ sku: string; quantity: number; price: number }>;
total: number;
email: string;
}
app.post('/orders', async (c) => {
const order: Order = await c.req.json();
// Queue multiple tasks for this order
await c.env.MY_QUEUE.sendBatch([
// Immediate: Send confirmation email
{
body: {
type: 'send-email',
template: 'order-confirmation',
to: order.email,
orderId: order.orderId,
},
},
// Immediate: Update inventory
{
body: {
type: 'update-inventory',
items: order.items,
orderId: order.orderId,
},
},
// Delayed: Send shipping notification (estimated 2 hours)
{
body: {
type: 'send-email',
template: 'shipping-notification',
to: order.email,
orderId: order.orderId,
},
delaySeconds: 2 * 60 * 60, // 2 hours
},
// Delayed: Request review (3 days later)
{
body: {
type: 'send-email',
template: 'review-request',
to: order.email,
orderId: order.orderId,
},
delaySeconds: 3 * 24 * 60 * 60, // 3 days
},
]);
return c.json({
status: 'success',
orderId: order.orderId,
tasksQueued: 4,
});
});
// ============================================================================
// Webhook Handler Example
// ============================================================================
app.post('/webhooks/:service', async (c) => {
const service = c.req.param('service');
const payload = await c.req.json();
// Queue webhook for async processing
await c.env.MY_QUEUE.send({
type: 'webhook',
service,
payload,
receivedAt: Date.now(),
});
// Respond immediately (don't block webhook)
return c.json({ received: true }, 200);
});
// ============================================================================
// Health Check
// ============================================================================
app.get('/health', (c) => {
return c.json({ status: 'ok' });
});
export default app;

View File

@@ -0,0 +1,241 @@
/**
* Queue Consumer with Exponential Backoff Retry
*
* Use when: Calling rate-limited APIs or handling temporary failures
*
* Strategy:
* - Retry with increasing delays: 1m → 2m → 4m → 8m → ...
* - Different delays for different error types
* - Max delay cap to prevent excessive waits
*
* Setup:
* 1. Create queue: npx wrangler queues create api-tasks
* 2. Create DLQ: npx wrangler queues create api-tasks-dlq
* 3. Configure consumer with higher max_retries (e.g., 10)
* 4. Deploy: npm run deploy
*/
type Env = {
DB: D1Database;
API_KEY: string;
};
export default {
async queue(
batch: MessageBatch,
env: Env,
ctx: ExecutionContext
): Promise<void> {
console.log(`Processing batch of ${batch.messages.length} messages`);
for (const message of batch.messages) {
try {
await processWithRetry(message, env);
message.ack();
} catch (error) {
await handleError(message, error);
}
}
},
};
/**
* Process message with smart retry logic
*/
async function processWithRetry(message: Message, env: Env) {
const { type, data } = message.body;
console.log(`Processing ${type} (attempt ${message.attempts})`);
switch (type) {
case 'call-api':
await callExternalAPI(data, message.attempts);
break;
case 'process-webhook':
await processWebhook(data);
break;
default:
throw new Error(`Unknown message type: ${type}`);
}
}
/**
* Call external API with retry handling
*/
async function callExternalAPI(data: any, attempts: number) {
const response = await fetch(data.url, {
method: data.method || 'POST',
headers: {
'Content-Type': 'application/json',
...data.headers,
},
body: JSON.stringify(data.payload),
});
// Handle different response codes
if (response.ok) {
console.log(`✅ API call successful`);
return await response.json();
}
// Rate limiting
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After');
const delaySeconds = retryAfter ? parseInt(retryAfter) : undefined;
throw new RateLimitError('Rate limited', delaySeconds, attempts);
}
// Server errors (500-599) - retry
if (response.status >= 500) {
throw new ServerError(`Server error: ${response.status}`, attempts);
}
// Client errors (400-499) - don't retry
if (response.status >= 400) {
const error = await response.text();
throw new ClientError(`Client error: ${error}`);
}
throw new Error(`Unexpected response: ${response.status}`);
}
/**
* Process webhook with timeout
*/
async function processWebhook(data: any) {
// Simulate processing
await new Promise(resolve => setTimeout(resolve, 1000));
console.log(`Webhook processed: ${data.id}`);
}
/**
* Handle errors with appropriate retry strategy
*/
async function handleError(message: Message, error: any) {
console.error(`Error processing message ${message.id}:`, error);
// Rate limit error - use suggested delay or exponential backoff
if (error instanceof RateLimitError) {
const delaySeconds = error.suggestedDelay || calculateExponentialBackoff(
message.attempts,
60, // Base delay: 1 minute
3600 // Max delay: 1 hour
);
console.log(`⏰ Rate limited. Retrying in ${delaySeconds}s (attempt ${message.attempts})`);
message.retry({ delaySeconds });
return;
}
// Server error - exponential backoff
if (error instanceof ServerError) {
const delaySeconds = calculateExponentialBackoff(
message.attempts,
30, // Base delay: 30 seconds
1800 // Max delay: 30 minutes
);
console.log(`🔄 Server error. Retrying in ${delaySeconds}s (attempt ${message.attempts})`);
message.retry({ delaySeconds });
return;
}
// Client error - don't retry (will go to DLQ)
if (error instanceof ClientError) {
console.error(`❌ Client error. Not retrying: ${error.message}`);
// Don't call ack() or retry() - will fail and go to DLQ
return;
}
// Unknown error - retry with exponential backoff
const delaySeconds = calculateExponentialBackoff(
message.attempts,
60, // Base delay: 1 minute
7200 // Max delay: 2 hours
);
console.log(`⚠️ Unknown error. Retrying in ${delaySeconds}s (attempt ${message.attempts})`);
message.retry({ delaySeconds });
}
/**
* Calculate exponential backoff delay
*
* Formula: min(baseDelay * 2^(attempts-1), maxDelay)
*
* Example (baseDelay=60, maxDelay=3600):
* - Attempt 1: 60s (1 min)
* - Attempt 2: 120s (2 min)
* - Attempt 3: 240s (4 min)
* - Attempt 4: 480s (8 min)
* - Attempt 5: 960s (16 min)
* - Attempt 6: 1920s (32 min)
* - Attempt 7+: 3600s (1 hour) - capped
*/
function calculateExponentialBackoff(
attempts: number,
baseDelay: number,
maxDelay: number
): number {
const delay = baseDelay * Math.pow(2, attempts - 1);
return Math.min(delay, maxDelay);
}
/**
* Calculate jittered backoff (prevents thundering herd)
*
* Adds randomness to delay to spread out retries
*/
function calculateJitteredBackoff(
attempts: number,
baseDelay: number,
maxDelay: number
): number {
const exponentialDelay = baseDelay * Math.pow(2, attempts - 1);
const delay = Math.min(exponentialDelay, maxDelay);
// Add jitter: ±25% randomness
const jitter = delay * 0.25 * (Math.random() * 2 - 1);
return Math.floor(delay + jitter);
}
// ============================================================================
// Custom Error Classes
// ============================================================================
class RateLimitError extends Error {
suggestedDelay?: number;
attempts: number;
constructor(message: string, suggestedDelay?: number, attempts: number = 1) {
super(message);
this.name = 'RateLimitError';
this.suggestedDelay = suggestedDelay;
this.attempts = attempts;
}
}
class ServerError extends Error {
attempts: number;
constructor(message: string, attempts: number = 1) {
super(message);
this.name = 'ServerError';
this.attempts = attempts;
}
}
class ClientError extends Error {
constructor(message: string) {
super(message);
this.name = 'ClientError';
}
}

View File

@@ -0,0 +1,57 @@
{
"name": "my-worker",
"main": "src/index.ts",
"compatibility_date": "2025-10-11",
// Queue Bindings Configuration
"queues": {
// Producer Configuration (send messages TO queues)
"producers": [
{
"binding": "MY_QUEUE", // Available as env.MY_QUEUE in code
"queue": "my-queue", // Queue name (must be created first)
"delivery_delay": 0 // Optional: delay all messages (0-43200 seconds)
},
{
"binding": "HIGH_PRIORITY_QUEUE",
"queue": "high-priority-queue"
},
{
"binding": "LOW_PRIORITY_QUEUE",
"queue": "low-priority-queue",
"delivery_delay": 60 // Delay all messages by 1 minute
}
],
// Consumer Configuration (receive messages FROM queues)
"consumers": [
{
"queue": "my-queue", // Queue to consume from
"max_batch_size": 10, // Max messages per batch (1-100, default: 10)
"max_batch_timeout": 5, // Max seconds to wait (0-60, default: 5)
"max_retries": 3, // Max retry attempts (0-100, default: 3)
"dead_letter_queue": "my-dlq", // Optional: where failed messages go
"retry_delay": 0, // Optional: delay retries (seconds)
"max_concurrency": null // Optional: limit concurrent consumers (default: auto-scale to 250)
},
{
"queue": "high-priority-queue",
"max_batch_size": 50, // Larger batches for high volume
"max_batch_timeout": 1, // Low latency
"max_retries": 5,
"dead_letter_queue": "priority-dlq"
},
{
"queue": "my-dlq", // Consumer for dead letter queue
"max_batch_size": 10,
"max_batch_timeout": 30 // Can wait longer for DLQ
}
]
},
// Optional: Increase CPU limit for long-running consumers
"limits": {
"cpu_ms": 30000 // 30 seconds (default). Can be increased to 300000 (5 minutes)
}
}