commit ce4251a69d6839cbc470a3f1bbe0cbe5ac983eaf Author: Zhongwei Li Date: Sun Nov 30 08:24:26 2025 +0800 Initial commit diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..1772fb8 --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,12 @@ +{ + "name": "cloudflare-queues", + "description": "Build async message queues with Cloudflare Queues for background processing. Use when: handling async tasks, batch processing, implementing retries, configuring dead letter queues, managing consumer concurrency, or troubleshooting queue timeout, batch retry, message loss, or throughput exceeded.", + "version": "1.0.0", + "author": { + "name": "Jeremy Dawes", + "email": "jeremy@jezweb.net" + }, + "skills": [ + "./" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..008fefb --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# cloudflare-queues + +Build async message queues with Cloudflare Queues for background processing. Use when: handling async tasks, batch processing, implementing retries, configuring dead letter queues, managing consumer concurrency, or troubleshooting queue timeout, batch retry, message loss, or throughput exceeded. diff --git a/SKILL.md b/SKILL.md new file mode 100644 index 0000000..1641a61 --- /dev/null +++ b/SKILL.md @@ -0,0 +1,558 @@ +--- +name: cloudflare-queues +description: | + Build async message queues with Cloudflare Queues for background processing. Use when: handling async tasks, batch processing, implementing retries, configuring dead letter queues, managing consumer concurrency, or troubleshooting queue timeout, batch retry, message loss, or throughput exceeded. +license: MIT +--- + +# Cloudflare Queues + +**Status**: Production Ready ✅ +**Last Updated**: 2025-11-24 +**Dependencies**: cloudflare-worker-base (for Worker setup) +**Latest Versions**: wrangler@4.50.0, @cloudflare/workers-types@4.20251121.0 + +**Recent Updates (2025)**: +- **April 2025**: Pull consumers increased limits (5,000 msg/s per queue, up from 1,200 requests/5min) +- **March 2025**: Pause & Purge APIs (wrangler queues pause-delivery, queues purge) +- **2025**: Customizable retention (60s to 14 days, previously fixed at 4 days) +- **2025**: Increased queue limits (10,000 queues per account, up from 10) + +--- + +## Quick Start (5 Minutes) + +```bash +# 1. Create queue +npx wrangler queues create my-queue + +# 2. Add producer binding to wrangler.jsonc +# { "queues": { "producers": [{ "binding": "MY_QUEUE", "queue": "my-queue" }] } } + +# 3. Send message from Worker +await env.MY_QUEUE.send({ userId: '123', action: 'process-order' }); + +# 4. Add consumer binding to wrangler.jsonc +# { "queues": { "consumers": [{ "queue": "my-queue", "max_batch_size": 10 }] } } + +# 5. Process messages +export default { + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + await processMessage(message.body); + message.ack(); // Explicit acknowledgement + } + } +}; + +# 6. Deploy and test +npx wrangler deploy +npx wrangler tail my-consumer +``` + +--- + +## Producer API + +```typescript +// Send single message +await env.MY_QUEUE.send({ userId: '123', action: 'send-email' }); + +// Send with delay (max 12 hours) +await env.MY_QUEUE.send({ action: 'reminder' }, { delaySeconds: 600 }); + +// Send batch (max 100 messages or 256 KB) +await env.MY_QUEUE.sendBatch([ + { body: { userId: '1' } }, + { body: { userId: '2' } }, +]); +``` + +**Critical Limits:** +- Message size: **128 KB max** (including ~100 bytes metadata) +- Messages >128 KB will fail - store in R2 and send reference instead +- Batch size: 100 messages or 256 KB total +- Delay: 0-43200 seconds (12 hours max) + +--- + +## Consumer API + +```typescript +export default { + async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise { + for (const message of batch.messages) { + // message.id - unique UUID + // message.timestamp - Date when sent + // message.body - your content + // message.attempts - retry count (starts at 1) + + await processMessage(message.body); + message.ack(); // Explicit ack (critical for non-idempotent ops) + } + } +}; + +// Retry with exponential backoff +message.retry({ delaySeconds: Math.min(60 * Math.pow(2, message.attempts - 1), 3600) }); + +// Batch methods +batch.ackAll(); // Ack all messages +batch.retryAll(); // Retry all messages +``` + +**Critical:** +- **`message.ack()`** - Mark success, prevents retry even if handler fails later +- **Use explicit ack for non-idempotent operations** (DB writes, API calls, payments) +- **Implicit ack** - If handler returns successfully without calling ack(), all messages auto-acknowledged +- **Ordering not guaranteed** - Don't assume FIFO message order + +--- + +## Critical Consumer Patterns + +### Explicit Acknowledgement (Non-Idempotent Operations) + +**ALWAYS use explicit ack() for:** Database writes, API calls, financial transactions + +```typescript +export default { + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + try { + await env.DB.prepare('INSERT INTO orders (id, amount) VALUES (?, ?)') + .bind(message.body.orderId, message.body.amount).run(); + message.ack(); // Only ack on success + } catch (error) { + console.error(`Failed ${message.id}:`, error); + // Don't ack - will retry + } + } + } +}; +``` + +**Why?** Prevents duplicate writes if one message in batch fails. Failed messages retry independently. + +--- + +### Exponential Backoff for Rate-Limited APIs + +```typescript +export default { + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + try { + await fetch('https://api.example.com/process', { + method: 'POST', + body: JSON.stringify(message.body), + }); + message.ack(); + } catch (error) { + if (error.status === 429) { + const delaySeconds = Math.min(60 * Math.pow(2, message.attempts - 1), 3600); + message.retry({ delaySeconds }); + } else { + message.retry(); + } + } + } + } +}; +``` + +--- + +### Dead Letter Queue (DLQ) - CRITICAL for Production + +**⚠️ Without DLQ, failed messages are DELETED PERMANENTLY after max_retries** + +```bash +npx wrangler queues create my-dlq +``` + +**wrangler.jsonc:** +```jsonc +{ + "queues": { + "consumers": [{ + "queue": "my-queue", + "max_retries": 3, + "dead_letter_queue": "my-dlq" // Messages go here after 3 failed retries + }] + } +} +``` + +**DLQ Consumer:** +```typescript +export default { + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + console.error('PERMANENTLY FAILED:', message.id, message.body); + await env.DB.prepare('INSERT INTO failed_messages (id, body) VALUES (?, ?)') + .bind(message.id, JSON.stringify(message.body)).run(); + message.ack(); // Remove from DLQ + } + } +}; +``` + +--- + +## Consumer Configuration + +```jsonc +{ + "queues": { + "consumers": [{ + "queue": "my-queue", + "max_batch_size": 100, // 1-100 (default: 10) + "max_batch_timeout": 30, // 0-60s (default: 5s) + "max_retries": 5, // 0-100 (default: 3) + "retry_delay": 300, // Seconds (default: 0) + "max_concurrency": 10, // 1-250 (default: auto-scale) + "dead_letter_queue": "my-dlq" // REQUIRED for production + }] + } +} +``` + +**Critical Settings:** + +- **Batching** - Consumer called when EITHER condition met (max_batch_size OR max_batch_timeout) +- **max_retries** - After exhausted: with DLQ → sent to DLQ, without DLQ → **DELETED PERMANENTLY** +- **max_concurrency** - Only set if upstream has rate limits or connection limits. Otherwise leave unset for auto-scaling (up to 250 concurrent invocations) +- **DLQ** - Create separately: `npx wrangler queues create my-dlq` + +--- + +## Wrangler Commands + +```bash +# Create queue +npx wrangler queues create my-queue +npx wrangler queues create my-queue --message-retention-period-secs 1209600 # 14 days + +# Manage queues +npx wrangler queues list +npx wrangler queues info my-queue +npx wrangler queues delete my-queue # ⚠️ Deletes ALL messages! + +# Pause/Purge (March 2025 - NEW) +npx wrangler queues pause-delivery my-queue # Pause processing, keep receiving +npx wrangler queues resume-delivery my-queue +npx wrangler queues purge my-queue # ⚠️ Permanently deletes all messages! + +# Consumer management +npx wrangler queues consumer add my-queue my-consumer-worker \ + --batch-size 50 --batch-timeout 10 --message-retries 5 +npx wrangler queues consumer remove my-queue my-consumer-worker +``` + +--- + +## Limits & Quotas + +| Feature | Limit | +|---------|-------| +| **Queues per account** | 10,000 | +| **Message size** | 128 KB (includes ~100 bytes metadata) | +| **Message retries** | 100 max | +| **Batch size** | 1-100 messages | +| **Batch timeout** | 0-60 seconds | +| **Messages per sendBatch** | 100 (or 256 KB total) | +| **Queue throughput** | 5,000 messages/second per queue | +| **Message retention** | 4 days (default), 14 days (max) | +| **Queue backlog size** | 25 GB per queue | +| **Concurrent consumers** | 250 (push-based, auto-scale) | +| **Consumer duration** | 15 minutes (wall clock) | +| **Consumer CPU time** | 30 seconds (default), 5 minutes (max) | +| **Visibility timeout** | 12 hours (pull consumers) | +| **Message delay** | 12 hours (max) | +| **API rate limit** | 1200 requests / 5 minutes | + +--- + +## Pricing + +**Requires Workers Paid plan** ($5/month) + +**Operations Pricing:** +- First 1,000,000 operations/month: **FREE** +- After that: **$0.40 per million operations** + +**What counts as an operation:** +- Each 64 KB chunk written, read, or deleted +- Messages >64 KB count as multiple operations: + - 65 KB message = 2 operations + - 127 KB message = 2 operations + - 128 KB message = 2 operations + +**Typical message lifecycle:** +- 1 write + 1 read + 1 delete = **3 operations** + +**Retries:** +- Each retry = additional **read operation** +- Message retried 3 times = 1 write + 4 reads + 1 delete = **6 operations** + +**Dead Letter Queue:** +- Writing to DLQ = additional **write operation** + +**Cost examples:** +- 1M messages/month (no retries): ((1M × 3) - 1M) / 1M × $0.40 = **$0.80** +- 10M messages/month: ((10M × 3) - 1M) / 1M × $0.40 = **$11.60** +- 100M messages/month: ((100M × 3) - 1M) / 1M × $0.40 = **$119.60** + +--- + + +## Error Handling + +### Common Errors + +#### 1. Message Too Large + +```typescript +// ❌ Bad: Message >128 KB +await env.MY_QUEUE.send({ + data: largeArray, // >128 KB +}); + +// ✅ Good: Check size before sending +const message = { data: largeArray }; +const size = new TextEncoder().encode(JSON.stringify(message)).length; + +if (size > 128000) { + // Store in R2, send reference + const key = `messages/${crypto.randomUUID()}.json`; + await env.MY_BUCKET.put(key, JSON.stringify(message)); + await env.MY_QUEUE.send({ type: 'large-message', r2Key: key }); +} else { + await env.MY_QUEUE.send(message); +} +``` + +--- + +#### 2. Throughput Exceeded + +```typescript +// ❌ Bad: Exceeding 5000 msg/s per queue +for (let i = 0; i < 10000; i++) { + await env.MY_QUEUE.send({ id: i }); // Too fast! +} + +// ✅ Good: Use sendBatch +const messages = Array.from({ length: 10000 }, (_, i) => ({ + body: { id: i }, +})); + +// Send in batches of 100 +for (let i = 0; i < messages.length; i += 100) { + await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100)); +} + +// ✅ Even better: Rate limit with delay +for (let i = 0; i < messages.length; i += 100) { + await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100)); + if (i + 100 < messages.length) { + await new Promise(resolve => setTimeout(resolve, 100)); // 100ms delay + } +} +``` + +--- + +#### 3. Consumer Timeout + +```typescript +// ❌ Bad: Long processing without CPU limit increase +export default { + async queue(batch: MessageBatch): Promise { + for (const message of batch.messages) { + await processForMinutes(message.body); // CPU timeout! + } + }, +}; + +// ✅ Good: Increase CPU limit in wrangler.jsonc +``` + +**wrangler.jsonc:** + +```jsonc +{ + "limits": { + "cpu_ms": 300000 // 5 minutes (max allowed) + } +} +``` + +--- + +#### 4. Backlog Growing + +```typescript +// Issue: Consumer too slow, backlog growing + +// ✅ Solution 1: Increase batch size +{ + "queues": { + "consumers": [{ + "queue": "my-queue", + "max_batch_size": 100 // Process more per invocation + }] + } +} + +// ✅ Solution 2: Let concurrency auto-scale (don't set max_concurrency) + +// ✅ Solution 3: Optimize consumer code +export default { + async queue(batch: MessageBatch, env: Env): Promise { + // Process in parallel + await Promise.all( + batch.messages.map(async (message) => { + await process(message.body); + message.ack(); + }) + ); + }, +}; +``` + +--- + +## Critical Rules + +**Always:** +- ✅ Configure DLQ for production (`dead_letter_queue` in consumer config) +- ✅ Use explicit `message.ack()` for non-idempotent ops (DB writes, API calls) +- ✅ Validate message size <128 KB before sending +- ✅ Use `sendBatch()` for multiple messages (more efficient) +- ✅ Implement exponential backoff: `60 * Math.pow(2, message.attempts - 1)` +- ✅ Let concurrency auto-scale (don't set `max_concurrency` unless upstream has rate limits) + +**Never:** +- ❌ Never assume FIFO ordering - not guaranteed +- ❌ Never rely on implicit ack for non-idempotent ops - use explicit `ack()` +- ❌ Never send messages >128 KB - will fail (store in R2 instead) +- ❌ Never skip DLQ in production - failed messages DELETED PERMANENTLY without DLQ +- ❌ Never exceed 5,000 msg/s per queue (push consumers) or rate limits apply +- ❌ Never process messages synchronously - use `Promise.all()` for parallelism + +--- + +## Troubleshooting + +### Issue: Messages not being delivered to consumer + +**Possible causes:** +1. Consumer not deployed +2. Wrong queue name in wrangler.jsonc +3. Delivery paused +4. Consumer throwing errors + +**Solution:** + +```bash +# Check queue info +npx wrangler queues info my-queue + +# Check if delivery paused +npx wrangler queues resume-delivery my-queue + +# Check consumer logs +npx wrangler tail my-consumer +``` + +--- + +### Issue: Entire batch retried when one message fails + +**Cause:** Using implicit acknowledgement with non-idempotent operations + +**Solution:** Use explicit ack() + +```typescript +// ✅ Explicit ack +for (const message of batch.messages) { + try { + await dbWrite(message.body); + message.ack(); // Only ack on success + } catch (error) { + console.error(`Failed: ${message.id}`); + // Don't ack - will retry + } +} +``` + +--- + +### Issue: Messages deleted without processing + +**Cause:** No Dead Letter Queue configured + +**Solution:** + +```bash +# Create DLQ +npx wrangler queues create my-dlq + +# Add to consumer config +``` + +```jsonc +{ + "queues": { + "consumers": [{ + "queue": "my-queue", + "dead_letter_queue": "my-dlq" + }] + } +} +``` + +--- + +### Issue: Consumer not auto-scaling + +**Possible causes:** +1. `max_concurrency` set to 1 +2. Consumer returning errors (not processing) +3. Batch processing too fast (no backlog) + +**Solution:** + +```jsonc +{ + "queues": { + "consumers": [{ + "queue": "my-queue", + // Don't set max_concurrency - let it auto-scale + "max_batch_size": 50 // Increase batch size instead + }] + } +} +``` + +--- + +## Related Documentation + +- [Cloudflare Queues Docs](https://developers.cloudflare.com/queues/) +- [How Queues Works](https://developers.cloudflare.com/queues/reference/how-queues-works/) +- [JavaScript APIs](https://developers.cloudflare.com/queues/configuration/javascript-apis/) +- [Batching & Retries](https://developers.cloudflare.com/queues/configuration/batching-retries/) +- [Consumer Concurrency](https://developers.cloudflare.com/queues/configuration/consumer-concurrency/) +- [Dead Letter Queues](https://developers.cloudflare.com/queues/configuration/dead-letter-queues/) +- [Wrangler Commands](https://developers.cloudflare.com/queues/reference/wrangler-commands/) +- [Limits](https://developers.cloudflare.com/queues/platform/limits/) +- [Pricing](https://developers.cloudflare.com/queues/platform/pricing/) + +--- + +**Last Updated**: 2025-10-21 +**Version**: 1.0.0 +**Maintainer**: Jeremy Dawes | jeremy@jezweb.net diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..7c89a67 --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,85 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:jezweb/claude-skills:skills/cloudflare-queues", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "1d6adf4910b9c99ca3b03b1f04912237e1f17458", + "treeHash": "2a0b88758f491c0ff2ebb3e3d6febe8f77d86939bc5f1038c0e8d70be7db78d9", + "generatedAt": "2025-11-28T10:18:57.011097Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "cloudflare-queues", + "description": "Build async message queues with Cloudflare Queues for background processing. Use when: handling async tasks, batch processing, implementing retries, configuring dead letter queues, managing consumer concurrency, or troubleshooting queue timeout, batch retry, message loss, or throughput exceeded.", + "version": "1.0.0" + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "3e3857051fc470dd855b39dd29931236f393586f7a6c70b74c05e162a77a7981" + }, + { + "path": "SKILL.md", + "sha256": "6458450eaa4b2bb903cec9dd272b5386296e074bd8e8711f2da2416f4cf987a1" + }, + { + "path": "references/consumer-api.md", + "sha256": "b1d4a320e287070219533b312b92a78df264339403c0f623085896700ec577e3" + }, + { + "path": "references/best-practices.md", + "sha256": "70da91d2551f014e4ee22fa1d47ac88ea1789ba362da936cc5cc13995ef8bd03" + }, + { + "path": "references/wrangler-commands.md", + "sha256": "3abc986e265ea9427b87ebcaa341ccd58d26c197e7f9ae4465cfab1b5d1f053b" + }, + { + "path": "references/producer-api.md", + "sha256": "550ad03feb27f50076e669c5c9f38ba4c893bce5f33ffc597ea750781442bf97" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "5e246f2dd0064cf6233202626b2062c096422d7a3bc6ac4fad3d544ee0addad4" + }, + { + "path": "templates/queues-consumer-explicit-ack.ts", + "sha256": "4088de698572b80d4cc4ea569a2bb342154ae9854095732e6711f7db639b7263" + }, + { + "path": "templates/wrangler-queues-config.jsonc", + "sha256": "fbdeb3da478b23a51aad24facd72f593343b5b69cd1e32de75f894f18a11b642" + }, + { + "path": "templates/queues-consumer-basic.ts", + "sha256": "e11fbca3b858805420619d9436637869561ed52c36477a26dcc16d64c64347e5" + }, + { + "path": "templates/queues-retry-with-delay.ts", + "sha256": "25550594b4a3a8cb4fc0af233a748475d5bc6238a8b705b7ffddf61978f31d85" + }, + { + "path": "templates/queues-dlq-pattern.ts", + "sha256": "b5d21ce147dbec91c7b407b55f163277679da06d2e9586eca16564cd8d008c8a" + }, + { + "path": "templates/queues-producer.ts", + "sha256": "361630a98392aa1d75970b568e8e40fcaf8fea6882138bdfe173ea302a4b4fdd" + } + ], + "dirSha256": "2a0b88758f491c0ff2ebb3e3d6febe8f77d86939bc5f1038c0e8d70be7db78d9" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file diff --git a/references/best-practices.md b/references/best-practices.md new file mode 100644 index 0000000..2d928ea --- /dev/null +++ b/references/best-practices.md @@ -0,0 +1,606 @@ +# Cloudflare Queues Best Practices + +Production patterns, optimization strategies, and common pitfalls. + +--- + +## Consumer Design Patterns + +### 1. Explicit Acknowledgement for Non-Idempotent Operations + +**Problem:** Database writes or API calls get duplicated when batch retries + +**Solution:** Use explicit `ack()` for each message + +```typescript +// ❌ Bad: Entire batch retried if one operation fails +export default { + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + await env.DB.prepare( + 'INSERT INTO orders (id, data) VALUES (?, ?)' + ).bind(message.body.id, JSON.stringify(message.body)).run(); + } + // If last insert fails, ALL inserts are retried → duplicates! + }, +}; + +// ✅ Good: Each message acknowledged individually +export default { + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + try { + await env.DB.prepare( + 'INSERT INTO orders (id, data) VALUES (?, ?)' + ).bind(message.body.id, JSON.stringify(message.body)).run(); + + message.ack(); // Only ack on success + } catch (error) { + console.error(`Failed: ${message.id}`, error); + // Don't ack - will retry this message only + } + } + }, +}; +``` + +--- + +### 2. Exponential Backoff for Rate Limits + +**Problem:** Retrying immediately hits same rate limit + +**Solution:** Use exponential backoff based on attempts + +```typescript +// ❌ Bad: Retry immediately +try { + await callRateLimitedAPI(); + message.ack(); +} catch (error) { + message.retry(); // Immediately hits rate limit again +} + +// ✅ Good: Exponential backoff +try { + await callRateLimitedAPI(); + message.ack(); +} catch (error) { + if (error.status === 429) { + const delaySeconds = Math.min( + 60 * Math.pow(2, message.attempts - 1), // 1m, 2m, 4m, 8m, ... + 3600 // Max 1 hour + ); + + console.log(`Rate limited. Retrying in ${delaySeconds}s`); + message.retry({ delaySeconds }); + } +} +``` + +--- + +### 3. Always Configure Dead Letter Queue + +**Problem:** Messages deleted permanently after max retries + +**Solution:** Always configure DLQ in production + +```jsonc +{ + "queues": { + "consumers": [ + { + "queue": "my-queue", + "max_retries": 3, + "dead_letter_queue": "my-dlq" // ✅ Always configure + } + ] + } +} +``` + +**DLQ Consumer:** + +```typescript +// Monitor and alert on DLQ messages +export default { + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + // Log failure + console.error('PERMANENT FAILURE:', message.id, message.body); + + // Store for manual review + await env.DB.prepare( + 'INSERT INTO failed_messages (id, body, attempts) VALUES (?, ?, ?)' + ).bind(message.id, JSON.stringify(message.body), message.attempts).run(); + + // Send alert + await sendAlert(`Message ${message.id} failed permanently`); + + message.ack(); + } + }, +}; +``` + +--- + +## Batch Configuration + +### Optimizing Batch Size + +**High volume, low latency:** + +```jsonc +{ + "queues": { + "consumers": [{ + "queue": "high-volume-queue", + "max_batch_size": 100, // Max messages per batch + "max_batch_timeout": 1 // Process ASAP + }] + } +} +``` + +**Low volume, batch efficiency:** + +```jsonc +{ + "queues": { + "consumers": [{ + "queue": "low-volume-queue", + "max_batch_size": 50, // Medium batch + "max_batch_timeout": 30 // Wait for batch to fill + }] + } +} +``` + +**Cost optimization:** + +```jsonc +{ + "queues": { + "consumers": [{ + "queue": "cost-optimized", + "max_batch_size": 100, // Largest batches + "max_batch_timeout": 60 // Max wait time + }] + } +} +``` + +--- + +## Concurrency Management + +### Let It Auto-Scale (Default) + +```jsonc +{ + "queues": { + "consumers": [{ + "queue": "my-queue" + // No max_concurrency - auto-scales to 250 + }] + } +} +``` + +**✅ Use when:** +- Default case +- Want best performance +- No upstream rate limits + +--- + +### Limit Concurrency + +```jsonc +{ + "queues": { + "consumers": [{ + "queue": "rate-limited-api-queue", + "max_concurrency": 10 // Limit to 10 concurrent consumers + }] + } +} +``` + +**✅ Use when:** +- Calling rate-limited APIs +- Database connection limits +- Want to control costs +- Protecting upstream services + +--- + +## Message Design + +### Include Metadata + +```typescript +// ✅ Good: Include helpful metadata +await env.MY_QUEUE.send({ + // Message type for routing + type: 'order-confirmation', + + // Idempotency key + idempotencyKey: crypto.randomUUID(), + + // Correlation ID for tracing + correlationId: requestId, + + // Timestamps + createdAt: Date.now(), + scheduledFor: Date.now() + 3600000, + + // Version for schema evolution + _version: 1, + + // Actual payload + payload: { + orderId: 'ORD-123', + userId: 'USER-456', + total: 99.99, + }, +}); +``` + +--- + +### Message Versioning + +```typescript +// Handle multiple message versions +export default { + async queue(batch: MessageBatch): Promise { + for (const message of batch.messages) { + switch (message.body._version) { + case 1: + await processV1(message.body); + break; + case 2: + await processV2(message.body); + break; + default: + console.warn(`Unknown version: ${message.body._version}`); + } + + message.ack(); + } + }, +}; +``` + +--- + +### Large Messages + +**Problem:** Messages >128 KB fail + +**Solution:** Store in R2, send reference + +```typescript +// Producer +const message = { largeData: ... }; +const size = new TextEncoder().encode(JSON.stringify(message)).length; + +if (size > 128 * 1024) { + // Store in R2 + const key = `messages/${crypto.randomUUID()}.json`; + await env.MY_BUCKET.put(key, JSON.stringify(message)); + + // Send reference + await env.MY_QUEUE.send({ + type: 'large-message', + r2Key: key, + size, + timestamp: Date.now(), + }); +} else { + await env.MY_QUEUE.send(message); +} + +// Consumer +export default { + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + if (message.body.type === 'large-message') { + // Fetch from R2 + const obj = await env.MY_BUCKET.get(message.body.r2Key); + const data = await obj.json(); + + await processLargeMessage(data); + + // Clean up R2 + await env.MY_BUCKET.delete(message.body.r2Key); + } else { + await processMessage(message.body); + } + + message.ack(); + } + }, +}; +``` + +--- + +## Error Handling + +### Different Retry Strategies by Error Type + +```typescript +try { + await processMessage(message.body); + message.ack(); +} catch (error) { + // Rate limit - exponential backoff + if (error.status === 429) { + message.retry({ + delaySeconds: Math.min(60 * Math.pow(2, message.attempts - 1), 3600), + }); + } + // Server error - shorter backoff + else if (error.status >= 500) { + message.retry({ delaySeconds: 60 }); + } + // Client error - don't retry + else if (error.status >= 400) { + console.error('Client error, not retrying:', error); + // Don't ack or retry - goes to DLQ + } + // Unknown error - retry immediately + else { + message.retry(); + } +} +``` + +--- + +### Circuit Breaker Pattern + +```typescript +class CircuitBreaker { + private failures = 0; + private lastFailure = 0; + private state: 'closed' | 'open' | 'half-open' = 'closed'; + + async call(fn: () => Promise): Promise { + if (this.state === 'open') { + // Check if we should try again + if (Date.now() - this.lastFailure > 60000) { // 1 minute + this.state = 'half-open'; + } else { + throw new Error('Circuit breaker is open'); + } + } + + try { + const result = await fn(); + + // Success - reset + if (this.state === 'half-open') { + this.state = 'closed'; + this.failures = 0; + } + + return result; + } catch (error) { + this.failures++; + this.lastFailure = Date.now(); + + // Open circuit after 3 failures + if (this.failures >= 3) { + this.state = 'open'; + } + + throw error; + } + } +} + +const breaker = new CircuitBreaker(); + +export default { + async queue(batch: MessageBatch): Promise { + for (const message of batch.messages) { + try { + await breaker.call(() => callUpstreamAPI(message.body)); + message.ack(); + } catch (error) { + if (error.message === 'Circuit breaker is open') { + // Retry later when circuit might be closed + message.retry({ delaySeconds: 120 }); + } else { + message.retry({ delaySeconds: 60 }); + } + } + } + }, +}; +``` + +--- + +## Cost Optimization + +### Batch Operations + +```typescript +// ❌ Bad: 100 operations (3 per message) +for (let i = 0; i < 100; i++) { + await env.MY_QUEUE.send({ id: i }); +} + +// ✅ Good: 3 operations total (write batch, read batch, delete batch) +await env.MY_QUEUE.sendBatch( + Array.from({ length: 100 }, (_, i) => ({ + body: { id: i }, + })) +); +``` + +### Larger Batches + +```jsonc +// Process more messages per invocation +{ + "queues": { + "consumers": [{ + "queue": "my-queue", + "max_batch_size": 100 // ✅ Max batch size = fewer invocations + }] + } +} +``` + +--- + +## Monitoring & Observability + +### Structured Logging + +```typescript +export default { + async queue(batch: MessageBatch): Promise { + console.log(JSON.stringify({ + event: 'batch_start', + queue: batch.queue, + messageCount: batch.messages.length, + timestamp: Date.now(), + })); + + let processed = 0; + let failed = 0; + + for (const message of batch.messages) { + try { + await processMessage(message.body); + message.ack(); + processed++; + } catch (error) { + console.error(JSON.stringify({ + event: 'message_failed', + messageId: message.id, + attempts: message.attempts, + error: error.message, + })); + failed++; + } + } + + console.log(JSON.stringify({ + event: 'batch_complete', + processed, + failed, + duration: Date.now() - batch.messages[0].timestamp.getTime(), + })); + }, +}; +``` + +### Metrics Tracking + +```typescript +export default { + async queue(batch: MessageBatch, env: Env): Promise { + const startTime = Date.now(); + + for (const message of batch.messages) { + const msgStartTime = Date.now(); + + try { + await processMessage(message.body); + message.ack(); + + // Track processing time + await env.METRICS.put( + `processing_time:${Date.now()}`, + String(Date.now() - msgStartTime) + ); + } catch (error) { + await env.METRICS.put( + `errors:${Date.now()}`, + JSON.stringify({ + messageId: message.id, + error: error.message, + }) + ); + } + } + + // Track batch metrics + await env.METRICS.put( + `batch_size:${Date.now()}`, + String(batch.messages.length) + ); + }, +}; +``` + +--- + +## Testing + +### Local Development + +```bash +# Start local dev server +npm run dev + +# In another terminal, send test messages +curl -X POST http://localhost:8787/send \ + -H "Content-Type: application/json" \ + -d '{"test": "message"}' + +# Watch consumer logs +npx wrangler tail my-consumer --local +``` + +### Unit Tests + +```typescript +import { describe, it, expect } from 'vitest'; + +describe('Queue Consumer', () => { + it('processes messages correctly', async () => { + const batch: MessageBatch = { + queue: 'test-queue', + messages: [ + { + id: '123', + timestamp: new Date(), + body: { type: 'test', data: 'hello' }, + attempts: 1, + ack: () => {}, + retry: () => {}, + }, + ], + ackAll: () => {}, + retryAll: () => {}, + }; + + const env = { + // Mock bindings + }; + + const ctx = { + waitUntil: () => {}, + passThroughOnException: () => {}, + }; + + await worker.queue(batch, env, ctx); + + // Assert expectations + }); +}); +``` + +--- + +**Last Updated**: 2025-10-21 diff --git a/references/consumer-api.md b/references/consumer-api.md new file mode 100644 index 0000000..246d576 --- /dev/null +++ b/references/consumer-api.md @@ -0,0 +1,499 @@ +# Consumer API Reference + +Complete reference for consuming messages from Cloudflare Queues. + +--- + +## Queue Handler + +Consumer Workers must export a `queue()` handler: + +```typescript +export default { + async queue( + batch: MessageBatch, + env: Env, + ctx: ExecutionContext + ): Promise { + // Process messages + }, +}; +``` + +### Parameters + +- **`batch`** - MessageBatch object containing messages +- **`env`** - Environment bindings (KV, D1, R2, etc.) +- **`ctx`** - Execution context + - `waitUntil(promise)` - Extend Worker lifetime + - `passThroughOnException()` - Continue on error + +### Return Value + +- Must return `Promise` or `void` +- Throwing error = all unacknowledged messages retried +- Returning successfully = implicit ack for messages without explicit ack() + +--- + +## MessageBatch Interface + +```typescript +interface MessageBatch { + readonly queue: string; + readonly messages: Message[]; + ackAll(): void; + retryAll(options?: QueueRetryOptions): void; +} +``` + +### Properties + +#### `queue` (string) + +Name of the queue this batch came from. + +**Use case:** One consumer handling multiple queues + +```typescript +export default { + async queue(batch: MessageBatch, env: Env): Promise { + switch (batch.queue) { + case 'high-priority': + await processUrgent(batch.messages); + break; + case 'low-priority': + await processNormal(batch.messages); + break; + default: + console.warn(`Unknown queue: ${batch.queue}`); + } + }, +}; +``` + +--- + +#### `messages` (Message[]) + +Array of Message objects. + +**Important:** +- Ordering is **best effort**, not guaranteed +- Don't rely on message order +- Use timestamps for ordering if needed + +```typescript +// Sort by timestamp if order matters +const sortedMessages = batch.messages.sort( + (a, b) => a.timestamp.getTime() - b.timestamp.getTime() +); +``` + +--- + +### Methods + +#### `ackAll()` - Acknowledge All Messages + +Mark all messages as successfully delivered, even if handler throws error. + +```typescript +export default { + async queue(batch: MessageBatch): Promise { + // Acknowledge all messages upfront + batch.ackAll(); + + // Even if this fails, messages won't retry + await processMessages(batch.messages); + }, +}; +``` + +**Use cases:** +- Idempotent operations where retries are safe +- Already processed messages (deduplication) +- Want to prevent retries regardless of outcome + +--- + +#### `retryAll(options?)` - Retry All Messages + +Mark all messages for retry. + +```typescript +interface QueueRetryOptions { + delaySeconds?: number; // 0-43200 (12 hours) +} + +batch.retryAll(); +batch.retryAll({ delaySeconds: 300 }); // Retry in 5 minutes +``` + +**Use cases:** +- Rate limiting (retry after backoff) +- Temporary system failure +- Upstream service unavailable + +```typescript +export default { + async queue(batch: MessageBatch): Promise { + try { + await callUpstreamAPI(batch.messages); + } catch (error) { + if (error.status === 503) { + // Service unavailable - retry in 5 minutes + batch.retryAll({ delaySeconds: 300 }); + } else { + // Other error - retry immediately + batch.retryAll(); + } + } + }, +}; +``` + +--- + +## Message Interface + +```typescript +interface Message { + readonly id: string; + readonly timestamp: Date; + readonly body: Body; + readonly attempts: number; + ack(): void; + retry(options?: QueueRetryOptions): void; +} +``` + +### Properties + +#### `id` (string) + +Unique system-generated message ID (UUID format). + +```typescript +console.log(message.id); // "550e8400-e29b-41d4-a716-446655440000" +``` + +--- + +#### `timestamp` (Date) + +When message was sent to queue. + +```typescript +console.log(message.timestamp); // Date object +console.log(message.timestamp.toISOString()); // "2025-10-21T12:34:56.789Z" + +// Check message age +const ageMs = Date.now() - message.timestamp.getTime(); +console.log(`Message age: ${ageMs}ms`); +``` + +--- + +#### `body` (any) + +Your message content. + +```typescript +interface MyMessage { + type: string; + userId: string; + data: any; +} + +const message: Message = ...; +console.log(message.body.type); // TypeScript knows the type +console.log(message.body.userId); +console.log(message.body.data); +``` + +--- + +#### `attempts` (number) + +Number of times consumer has attempted to process this message. Starts at 1. + +```typescript +console.log(message.attempts); // 1 (first attempt) + +// Use for exponential backoff +const delaySeconds = 60 * Math.pow(2, message.attempts - 1); +message.retry({ delaySeconds }); + +// Attempts: 1 → 60s, 2 → 120s, 3 → 240s, 4 → 480s, ... +``` + +--- + +### Methods + +#### `ack()` - Acknowledge Message + +Mark message as successfully delivered. Won't retry even if handler fails. + +```typescript +export default { + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + try { + // Non-idempotent operation + await env.DB.prepare( + 'INSERT INTO orders (id, data) VALUES (?, ?)' + ).bind(message.body.id, JSON.stringify(message.body)).run(); + + // CRITICAL: Acknowledge success + message.ack(); + } catch (error) { + console.error(`Failed: ${message.id}`, error); + // Don't ack - will retry + } + } + }, +}; +``` + +**Use cases:** +- Database writes +- Payment processing +- Any non-idempotent operation +- Prevents duplicate processing + +--- + +#### `retry(options?)` - Retry Message + +Mark message for retry. Can optionally delay retry. + +```typescript +interface QueueRetryOptions { + delaySeconds?: number; // 0-43200 (12 hours) +} + +message.retry(); +message.retry({ delaySeconds: 600 }); // Retry in 10 minutes +``` + +**Use cases:** +- Rate limiting (429 errors) +- Temporary failures +- Exponential backoff + +```typescript +// Exponential backoff +message.retry({ + delaySeconds: Math.min( + 60 * Math.pow(2, message.attempts - 1), + 3600 // Max 1 hour + ), +}); + +// Different delays for different errors +try { + await processMessage(message.body); + message.ack(); +} catch (error) { + if (error.status === 429) { + // Rate limited - retry in 5 minutes + message.retry({ delaySeconds: 300 }); + } else if (error.status >= 500) { + // Server error - retry in 1 minute + message.retry({ delaySeconds: 60 }); + } else { + // Client error - don't retry + console.error('Client error, not retrying'); + } +} +``` + +--- + +## Acknowledgement Precedence Rules + +When mixing ack/retry calls: + +1. **`ack()` or `retry()` wins** - First call on a message takes precedence +2. **Individual > Batch** - Message-level call overrides batch-level call +3. **Subsequent calls ignored** - Second call on same message is silently ignored + +```typescript +// ack() takes precedence +message.ack(); +message.retry(); // Ignored + +// retry() takes precedence +message.retry(); +message.ack(); // Ignored + +// Individual overrides batch +message.ack(); +batch.retryAll(); // Doesn't affect this message + +// Batch doesn't affect individually handled messages +for (const msg of batch.messages) { + msg.ack(); // These messages won't be affected by retryAll() +} +batch.retryAll(); // Only affects messages not explicitly ack'd +``` + +--- + +## Processing Patterns + +### Sequential Processing + +```typescript +export default { + async queue(batch: MessageBatch): Promise { + for (const message of batch.messages) { + await processMessage(message.body); + message.ack(); + } + }, +}; +``` + +**Pros:** Simple, ordered processing +**Cons:** Slow for large batches + +--- + +### Parallel Processing + +```typescript +export default { + async queue(batch: MessageBatch): Promise { + await Promise.all( + batch.messages.map(async (message) => { + try { + await processMessage(message.body); + message.ack(); + } catch (error) { + console.error(`Failed: ${message.id}`, error); + } + }) + ); + }, +}; +``` + +**Pros:** Fast, efficient +**Cons:** No ordering, higher memory usage + +--- + +### Batched Database Writes + +```typescript +export default { + async queue(batch: MessageBatch, env: Env): Promise { + // Prepare all statements + const statements = batch.messages.map((message) => + env.DB.prepare( + 'INSERT INTO events (id, data) VALUES (?, ?)' + ).bind(message.id, JSON.stringify(message.body)) + ); + + // Execute in batch + const results = await env.DB.batch(statements); + + // Acknowledge based on results + for (let i = 0; i < results.length; i++) { + if (results[i].success) { + batch.messages[i].ack(); + } else { + console.error(`Failed: ${batch.messages[i].id}`); + } + } + }, +}; +``` + +**Pros:** Efficient database usage +**Cons:** More complex error handling + +--- + +### Message Type Routing + +```typescript +export default { + async queue(batch: MessageBatch, env: Env): Promise { + for (const message of batch.messages) { + try { + switch (message.body.type) { + case 'email': + await sendEmail(message.body, env); + break; + case 'sms': + await sendSMS(message.body, env); + break; + case 'push': + await sendPush(message.body, env); + break; + default: + console.warn(`Unknown type: ${message.body.type}`); + } + + message.ack(); + } catch (error) { + console.error(`Failed: ${message.id}`, error); + message.retry({ delaySeconds: 300 }); + } + } + }, +}; +``` + +--- + +## ExecutionContext Methods + +### `waitUntil(promise)` + +Extend Worker lifetime beyond handler return. + +```typescript +export default { + async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise { + for (const message of batch.messages) { + await processMessage(message.body); + message.ack(); + + // Log asynchronously (doesn't block) + ctx.waitUntil( + env.LOGS.put(`log:${message.id}`, JSON.stringify({ + processedAt: Date.now(), + message: message.body, + })) + ); + } + }, +}; +``` + +--- + +### `passThroughOnException()` + +Continue processing even if handler throws. + +```typescript +export default { + async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise { + ctx.passThroughOnException(); + + // If this throws, Worker doesn't fail + // But unacknowledged messages will retry + await processMessages(batch.messages); + }, +}; +``` + +--- + +**Last Updated**: 2025-10-21 diff --git a/references/producer-api.md b/references/producer-api.md new file mode 100644 index 0000000..1aaef86 --- /dev/null +++ b/references/producer-api.md @@ -0,0 +1,337 @@ +# Producer API Reference + +Complete reference for sending messages to Cloudflare Queues from Workers. + +--- + +## Queue Binding + +Access queues via environment bindings configured in `wrangler.jsonc`: + +```jsonc +{ + "queues": { + "producers": [ + { + "binding": "MY_QUEUE", + "queue": "my-queue" + } + ] + } +} +``` + +**TypeScript:** + +```typescript +type Bindings = { + MY_QUEUE: Queue; +}; + +const app = new Hono<{ Bindings: Bindings }>(); + +app.post('/send', async (c) => { + await c.env.MY_QUEUE.send({ data: 'hello' }); + return c.json({ sent: true }); +}); +``` + +--- + +## `send()` - Send Single Message + +### Signature + +```typescript +interface Queue { + send(body: Body, options?: QueueSendOptions): Promise; +} + +interface QueueSendOptions { + delaySeconds?: number; // 0-43200 (12 hours) +} +``` + +### Parameters + +- **`body`** - Any JSON serializable value + - Must be compatible with structured clone algorithm + - Max size: 128 KB (including ~100 bytes metadata) + - Types: primitives, objects, arrays, Date, Map, Set, etc. + - NOT supported: Functions, Symbols, DOM nodes + +- **`options.delaySeconds`** (optional) + - Delay message delivery + - Range: 0-43200 seconds (0-12 hours) + - Default: 0 (immediate delivery) + +### Examples + +```typescript +// Send simple message +await env.MY_QUEUE.send({ userId: '123', action: 'welcome' }); + +// Send with delay (10 minutes) +await env.MY_QUEUE.send( + { userId: '123', action: 'reminder' }, + { delaySeconds: 600 } +); + +// Send complex object +await env.MY_QUEUE.send({ + type: 'order', + order: { + id: 'ORD-123', + items: [ + { sku: 'ITEM-1', quantity: 2, price: 19.99 }, + { sku: 'ITEM-2', quantity: 1, price: 29.99 }, + ], + total: 69.97, + customer: { + id: 'CUST-456', + email: 'user@example.com', + }, + metadata: { + source: 'web', + campaign: 'summer-sale', + }, + }, + timestamp: Date.now(), +}); + +// Send with Date objects +await env.MY_QUEUE.send({ + scheduledFor: new Date('2025-12-25T00:00:00Z'), + createdAt: new Date(), +}); +``` + +--- + +## `sendBatch()` - Send Multiple Messages + +### Signature + +```typescript +interface Queue { + sendBatch( + messages: Iterable>, + options?: QueueSendBatchOptions + ): Promise; +} + +interface MessageSendRequest { + body: Body; + delaySeconds?: number; +} + +interface QueueSendBatchOptions { + delaySeconds?: number; // Default delay for all messages +} +``` + +### Parameters + +- **`messages`** - Iterable of message objects + - Each message has `body` and optional `delaySeconds` + - Max 100 messages per batch + - Max 256 KB total batch size + - Can be Array, Set, Generator, etc. + +- **`options.delaySeconds`** (optional) + - Default delay applied to all messages + - Overridden by individual message `delaySeconds` + +### Examples + +```typescript +// Send batch of messages +await env.MY_QUEUE.sendBatch([ + { body: { userId: '1', action: 'email' } }, + { body: { userId: '2', action: 'email' } }, + { body: { userId: '3', action: 'email' } }, +]); + +// Send batch with individual delays +await env.MY_QUEUE.sendBatch([ + { body: { task: 'immediate' }, delaySeconds: 0 }, + { body: { task: '5-min' }, delaySeconds: 300 }, + { body: { task: '1-hour' }, delaySeconds: 3600 }, +]); + +// Send batch with default delay (overridable per message) +await env.MY_QUEUE.sendBatch( + [ + { body: { task: 'default-delay' } }, + { body: { task: 'custom-delay' }, delaySeconds: 600 }, + ], + { delaySeconds: 300 } // Default 5 minutes +); + +// Dynamic batch from database +const users = await getActiveUsers(); +await env.MY_QUEUE.sendBatch( + users.map(user => ({ + body: { + type: 'send-notification', + userId: user.id, + email: user.email, + message: 'You have a new message', + }, + })) +); + +// Generator pattern +async function* generateMessages() { + for (let i = 0; i < 100; i++) { + yield { + body: { taskId: i, priority: i % 3 }, + }; + } +} + +await env.MY_QUEUE.sendBatch(generateMessages()); +``` + +--- + +## Message Size Validation + +Messages must be ≤128 KB. Check size before sending: + +```typescript +async function sendWithValidation(queue: Queue, message: any) { + const messageStr = JSON.stringify(message); + const size = new TextEncoder().encode(messageStr).length; + const MAX_SIZE = 128 * 1024; // 128 KB + + if (size > MAX_SIZE) { + throw new Error( + `Message too large: ${size} bytes (max ${MAX_SIZE})` + ); + } + + await queue.send(message); +} +``` + +**Handling large messages:** + +```typescript +// Store large data in R2, send reference +if (size > 128 * 1024) { + const key = `messages/${crypto.randomUUID()}.json`; + await env.MY_BUCKET.put(key, JSON.stringify(largeMessage)); + + await env.MY_QUEUE.send({ + type: 'large-message', + r2Key: key, + metadata: { + size, + createdAt: Date.now(), + }, + }); +} +``` + +--- + +## Throughput Management + +Max throughput: 5,000 messages/second per queue. + +**Rate limiting:** + +```typescript +// Batch sends for better throughput +const messages = Array.from({ length: 1000 }, (_, i) => ({ + body: { id: i }, +})); + +// Send in batches of 100 (10 sendBatch calls vs 1000 send calls) +for (let i = 0; i < messages.length; i += 100) { + const batch = messages.slice(i, i + 100); + await env.MY_QUEUE.sendBatch(batch); +} + +// Add delay if needed +for (let i = 0; i < messages.length; i += 100) { + const batch = messages.slice(i, i + 100); + await env.MY_QUEUE.sendBatch(batch); + + if (i + 100 < messages.length) { + await new Promise(resolve => setTimeout(resolve, 100)); // 100ms + } +} +``` + +--- + +## Error Handling + +```typescript +try { + await env.MY_QUEUE.send(message); +} catch (error) { + if (error.message.includes('Too Many Requests')) { + // Throughput exceeded (>5000 msg/s) + console.error('Rate limited'); + } else if (error.message.includes('too large')) { + // Message >128 KB + console.error('Message too large'); + } else { + // Other error + console.error('Queue send failed:', error); + } +} +``` + +--- + +## Production Patterns + +### Idempotency Keys + +```typescript +await env.MY_QUEUE.send({ + idempotencyKey: crypto.randomUUID(), + orderId: 'ORD-123', + action: 'process', +}); +``` + +### Message Versioning + +```typescript +await env.MY_QUEUE.send({ + _version: 1, + _schema: 'order-v1', + orderId: 'ORD-123', + // ... +}); +``` + +### Correlation IDs + +```typescript +await env.MY_QUEUE.send({ + correlationId: requestId, + parentSpanId: traceId, + // ... +}); +``` + +### Priority Queues + +```typescript +// Use multiple queues for different priorities +if (priority === 'high') { + await env.HIGH_PRIORITY_QUEUE.send(message); +} else { + await env.LOW_PRIORITY_QUEUE.send(message); +} +``` + +--- + +**Last Updated**: 2025-10-21 diff --git a/references/wrangler-commands.md b/references/wrangler-commands.md new file mode 100644 index 0000000..2bf4060 --- /dev/null +++ b/references/wrangler-commands.md @@ -0,0 +1,392 @@ +# Wrangler Commands for Cloudflare Queues + +Complete reference for managing Cloudflare Queues via the `wrangler` CLI. + +--- + +## Queue Management + +### Create Queue + +```bash +npx wrangler queues create [OPTIONS] +``` + +**Options:** +- `--delivery-delay-secs ` - Default delay for all messages (0-43200) +- `--message-retention-period-secs ` - How long messages persist (60-1209600, default: 345600 / 4 days) + +**Examples:** + +```bash +# Create basic queue +npx wrangler queues create my-queue + +# Create with custom retention (7 days) +npx wrangler queues create my-queue --message-retention-period-secs 604800 + +# Create with default delivery delay (5 minutes) +npx wrangler queues create delayed-queue --delivery-delay-secs 300 +``` + +--- + +### List Queues + +```bash +npx wrangler queues list +``` + +**Output:** +``` +┌──────────────────┬─────────────┬──────────┐ +│ Name │ Consumers │ Messages │ +├──────────────────┼─────────────┼──────────┤ +│ my-queue │ 1 │ 0 │ +│ high-priority │ 2 │ 142 │ +│ my-dlq │ 1 │ 5 │ +└──────────────────┴─────────────┴──────────┘ +``` + +--- + +### Get Queue Info + +```bash +npx wrangler queues info +``` + +**Example:** + +```bash +npx wrangler queues info my-queue + +# Output: +# Queue: my-queue +# Message Retention: 345600 seconds (4 days) +# Delivery Delay: 0 seconds +# Consumers: 1 +# - my-consumer (batch_size: 10, batch_timeout: 5s, max_retries: 3) +# Backlog: 0 messages +``` + +--- + +### Update Queue + +```bash +npx wrangler queues update [OPTIONS] +``` + +**Options:** +- `--delivery-delay-secs ` - Update default delay +- `--message-retention-period-secs ` - Update retention period + +**Examples:** + +```bash +# Update retention to 14 days (max) +npx wrangler queues update my-queue --message-retention-period-secs 1209600 + +# Update delivery delay to 10 minutes +npx wrangler queues update my-queue --delivery-delay-secs 600 +``` + +--- + +### Delete Queue + +```bash +npx wrangler queues delete +``` + +**⚠️ WARNING:** +- Deletes ALL messages in the queue +- Cannot be undone +- Use with extreme caution in production + +**Example:** + +```bash +npx wrangler queues delete old-queue +``` + +--- + +## Consumer Management + +### Add Consumer + +```bash +npx wrangler queues consumer add [OPTIONS] +``` + +**Options:** +- `--batch-size ` - Max messages per batch (1-100, default: 10) +- `--batch-timeout ` - Max wait time (0-60, default: 5) +- `--message-retries ` - Max retry attempts (0-100, default: 3) +- `--max-concurrency ` - Limit concurrent consumers (default: auto-scale to 250) +- `--retry-delay-secs ` - Default retry delay +- `--dead-letter-queue ` - DLQ for failed messages + +**Examples:** + +```bash +# Basic consumer +npx wrangler queues consumer add my-queue my-consumer + +# Optimized for high throughput +npx wrangler queues consumer add my-queue my-consumer \ + --batch-size 100 \ + --batch-timeout 1 + +# With DLQ and retry settings +npx wrangler queues consumer add my-queue my-consumer \ + --batch-size 50 \ + --message-retries 5 \ + --retry-delay-secs 300 \ + --dead-letter-queue my-dlq + +# Limit concurrency for rate-limited APIs +npx wrangler queues consumer add api-queue api-consumer \ + --max-concurrency 10 +``` + +--- + +### Remove Consumer + +```bash +npx wrangler queues consumer remove +``` + +**Example:** + +```bash +npx wrangler queues consumer remove my-queue my-consumer +``` + +--- + +## Queue Operations + +### Purge Queue + +```bash +npx wrangler queues purge +``` + +**⚠️ WARNING:** +- Permanently deletes ALL messages +- Cannot be undone +- Use for clearing test data or stuck queues + +**Example:** + +```bash +npx wrangler queues purge test-queue +``` + +--- + +### Pause Delivery + +```bash +npx wrangler queues pause-delivery +``` + +**Use cases:** +- Maintenance on consumer Workers +- Debugging consumer issues +- Temporarily stop processing without deleting messages + +**Example:** + +```bash +npx wrangler queues pause-delivery my-queue +``` + +--- + +### Resume Delivery + +```bash +npx wrangler queues resume-delivery +``` + +**Example:** + +```bash +npx wrangler queues resume-delivery my-queue +``` + +--- + +## Event Subscriptions + +Event subscriptions automatically send messages to a queue when events occur in other Cloudflare services. + +### Create Subscription + +```bash +npx wrangler queues subscription create [OPTIONS] +``` + +**Options:** +- `--source ` - Event source (kv, r2, superSlurper, vectorize, workersAi.model, workersBuilds.worker, workflows.workflow) +- `--events ` - Comma-separated list of event types +- `--name ` - Subscription name (auto-generated if omitted) +- `--enabled` - Whether subscription is active (default: true) + +**Examples:** + +```bash +# Subscribe to R2 bucket events +npx wrangler queues subscription create my-queue \ + --source r2 \ + --events object-create,object-delete \ + --bucket-name my-bucket + +# Subscribe to KV namespace events +npx wrangler queues subscription create my-queue \ + --source kv \ + --events key-write,key-delete \ + --namespace-id abc123 + +# Subscribe to Worker build events +npx wrangler queues subscription create build-queue \ + --source workersBuilds.worker \ + --events build-complete,build-failed \ + --worker-name my-worker +``` + +--- + +### List Subscriptions + +```bash +npx wrangler queues subscription list [OPTIONS] +``` + +**Options:** +- `--page ` - Page number +- `--per-page ` - Results per page +- `--json` - Output as JSON + +**Example:** + +```bash +npx wrangler queues subscription list my-queue --json +``` + +--- + +### Get Subscription + +```bash +npx wrangler queues subscription get --id [--json] +``` + +**Example:** + +```bash +npx wrangler queues subscription get my-queue --id sub_123 --json +``` + +--- + +### Delete Subscription + +```bash +npx wrangler queues subscription delete --id +``` + +**Example:** + +```bash +npx wrangler queues subscription delete my-queue --id sub_123 +``` + +--- + +## Global Flags + +These flags work on all commands: + +- `--help` - Show help +- `--config ` - Path to wrangler.toml or wrangler.jsonc +- `--cwd ` - Run as if started in specified directory + +--- + +## Complete Workflow Example + +```bash +# 1. Create queues +npx wrangler queues create my-queue +npx wrangler queues create my-dlq + +# 2. Create and deploy producer Worker +cd my-producer +npm create cloudflare@latest -- --type hello-world --ts +# Add producer binding to wrangler.jsonc +npm run deploy + +# 3. Create and deploy consumer Worker +cd ../my-consumer +npm create cloudflare@latest -- --type hello-world --ts +# Add consumer handler +npm run deploy + +# 4. Add consumer to queue +npx wrangler queues consumer add my-queue my-consumer \ + --batch-size 50 \ + --message-retries 5 \ + --dead-letter-queue my-dlq + +# 5. Monitor queue +npx wrangler queues info my-queue + +# 6. Watch consumer logs +npx wrangler tail my-consumer + +# 7. If needed, pause delivery +npx wrangler queues pause-delivery my-queue + +# 8. Resume delivery +npx wrangler queues resume-delivery my-queue +``` + +--- + +## Troubleshooting + +### Check queue backlog + +```bash +npx wrangler queues info my-queue | grep "Backlog" +``` + +### Clear stuck queue + +```bash +npx wrangler queues purge my-queue +``` + +### Verify consumer is attached + +```bash +npx wrangler queues info my-queue | grep "Consumers" +``` + +### Check for delivery paused + +```bash +npx wrangler queues info my-queue +# Look for "Delivery: paused" +``` + +--- + +**Last Updated**: 2025-10-21 +**Wrangler Version**: 4.43.0+ diff --git a/templates/queues-consumer-basic.ts b/templates/queues-consumer-basic.ts new file mode 100644 index 0000000..c736e85 --- /dev/null +++ b/templates/queues-consumer-basic.ts @@ -0,0 +1,127 @@ +/** + * Basic Queue Consumer (Implicit Acknowledgement) + * + * Use when: Operations are idempotent (safe to retry) + * Examples: Logging, sending emails (idempotent with deduplication), metrics + * + * How it works: + * - If handler returns successfully → all messages acknowledged + * - If handler throws error → entire batch retried + * + * Setup: + * 1. Create queue: npx wrangler queues create my-queue + * 2. Add consumer binding to wrangler.jsonc (see wrangler-queues-config.jsonc) + * 3. Deploy: npm run deploy + */ + +type Env = { + // Add your bindings here (D1, KV, R2, etc.) + LOGS: KVNamespace; +}; + +export default { + async queue( + batch: MessageBatch, + env: Env, + ctx: ExecutionContext + ): Promise { + console.log(`Processing batch of ${batch.messages.length} messages from queue: ${batch.queue}`); + + // Process each message + for (const message of batch.messages) { + console.log(`Message ID: ${message.id}`); + console.log(`Attempt: ${message.attempts}`); + console.log(`Timestamp: ${message.timestamp}`); + console.log(`Body:`, message.body); + + // Your processing logic + await processMessage(message.body, env); + } + + // Implicit acknowledgement: + // Returning successfully acknowledges ALL messages + // If this function throws, ALL messages will be retried + }, +}; + +/** + * Process individual message + */ +async function processMessage(body: any, env: Env) { + switch (body.type) { + case 'send-email': + await sendEmail(body); + break; + + case 'log-event': + await logEvent(body, env); + break; + + case 'update-metrics': + await updateMetrics(body, env); + break; + + default: + console.warn(`Unknown message type: ${body.type}`); + } +} + +/** + * Example: Send email (idempotent with external deduplication) + */ +async function sendEmail(data: any) { + console.log(`Sending email to ${data.to}`); + + // Call email API (e.g., Resend, SendGrid) + const response = await fetch('https://api.resend.com/emails', { + method: 'POST', + headers: { + 'Authorization': `Bearer ${process.env.RESEND_API_KEY}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + from: 'noreply@example.com', + to: data.to, + subject: data.subject, + html: data.html, + }), + }); + + if (!response.ok) { + throw new Error(`Failed to send email: ${response.statusText}`); + } + + console.log(`Email sent successfully to ${data.to}`); +} + +/** + * Example: Log event to KV (idempotent) + */ +async function logEvent(data: any, env: Env) { + const logKey = `log:${data.eventId}:${Date.now()}`; + + await env.LOGS.put(logKey, JSON.stringify({ + event: data.event, + userId: data.userId, + timestamp: new Date().toISOString(), + metadata: data.metadata, + }), { + expirationTtl: 86400 * 30, // 30 days + }); + + console.log(`Event logged: ${logKey}`); +} + +/** + * Example: Update metrics (idempotent aggregation) + */ +async function updateMetrics(data: any, env: Env) { + // Increment counter in KV + const key = `metric:${data.metric}`; + const current = await env.LOGS.get(key); + const count = current ? parseInt(current) : 0; + + await env.LOGS.put(key, String(count + 1)); + + console.log(`Metric ${data.metric} updated: ${count + 1}`); +} diff --git a/templates/queues-consumer-explicit-ack.ts b/templates/queues-consumer-explicit-ack.ts new file mode 100644 index 0000000..dcba97a --- /dev/null +++ b/templates/queues-consumer-explicit-ack.ts @@ -0,0 +1,157 @@ +/** + * Queue Consumer with Explicit Acknowledgement + * + * Use when: Operations are non-idempotent (NOT safe to retry) + * Examples: Database writes, payment processing, API calls with side effects + * + * How it works: + * - Call message.ack() after successful processing + * - Only acknowledged messages are removed from queue + * - Failed messages can retry independently + * + * Setup: + * 1. Create queue: npx wrangler queues create my-queue + * 2. Create DLQ: npx wrangler queues create my-dlq + * 3. Add consumer binding with DLQ (see wrangler-queues-config.jsonc) + * 4. Deploy: npm run deploy + */ + +type Env = { + DB: D1Database; + API_KEY: string; +}; + +export default { + async queue( + batch: MessageBatch, + env: Env, + ctx: ExecutionContext + ): Promise { + console.log(`Processing batch of ${batch.messages.length} messages`); + + // Process each message individually + for (const message of batch.messages) { + try { + // Non-idempotent operation + await processNonIdempotent(message.body, env); + + // CRITICAL: Explicitly acknowledge success + message.ack(); + + console.log(`✅ Message ${message.id} processed successfully`); + } catch (error) { + console.error(`❌ Failed to process message ${message.id}:`, error); + + // Don't call ack() - message will retry + // After max_retries, sent to DLQ (if configured) + } + } + + // Note: We DON'T throw an error here + // Only unacknowledged messages will retry + }, +}; + +/** + * Process message with non-idempotent operations + */ +async function processNonIdempotent(body: any, env: Env) { + switch (body.type) { + case 'create-order': + await createOrder(body, env); + break; + + case 'charge-payment': + await chargePayment(body, env); + break; + + case 'update-inventory': + await updateInventory(body, env); + break; + + default: + throw new Error(`Unknown message type: ${body.type}`); + } +} + +/** + * Example: Create database record (non-idempotent) + */ +async function createOrder(data: any, env: Env) { + // Insert into database (can't be retried safely) + const result = await env.DB.prepare(` + INSERT INTO orders (id, user_id, total, status, created_at) + VALUES (?, ?, ?, ?, ?) + `) + .bind( + data.orderId, + data.userId, + data.total, + 'pending', + new Date().toISOString() + ) + .run(); + + if (!result.success) { + throw new Error(`Failed to create order: ${data.orderId}`); + } + + console.log(`Order created: ${data.orderId}`); +} + +/** + * Example: Charge payment (non-idempotent) + */ +async function chargePayment(data: any, env: Env) { + // Call payment API (can't be retried safely without deduplication) + const response = await fetch('https://api.stripe.com/v1/charges', { + method: 'POST', + headers: { + 'Authorization': `Bearer ${env.API_KEY}`, + 'Content-Type': 'application/x-www-form-urlencoded', + }, + body: new URLSearchParams({ + amount: String(data.amount), + currency: data.currency, + source: data.token, + description: data.description, + }), + }); + + if (!response.ok) { + const error = await response.text(); + throw new Error(`Payment failed: ${error}`); + } + + const charge = await response.json(); + console.log(`Payment charged: ${charge.id}`); + + // Update database with charge ID + await env.DB.prepare(` + UPDATE orders SET payment_id = ?, status = 'paid' WHERE id = ? + `) + .bind(charge.id, data.orderId) + .run(); +} + +/** + * Example: Update inventory (non-idempotent) + */ +async function updateInventory(data: any, env: Env) { + for (const item of data.items) { + // Decrement inventory count + const result = await env.DB.prepare(` + UPDATE inventory + SET quantity = quantity - ? + WHERE sku = ? AND quantity >= ? + `) + .bind(item.quantity, item.sku, item.quantity) + .run(); + + if (result.changes === 0) { + throw new Error(`Insufficient inventory for SKU: ${item.sku}`); + } + + console.log(`Inventory updated: ${item.sku} (-${item.quantity})`); + } +} diff --git a/templates/queues-dlq-pattern.ts b/templates/queues-dlq-pattern.ts new file mode 100644 index 0000000..a985edb --- /dev/null +++ b/templates/queues-dlq-pattern.ts @@ -0,0 +1,215 @@ +/** + * Dead Letter Queue (DLQ) Consumer + * + * Handles messages that failed after max retries in the main queue. + * + * Use when: You need to: + * - Log permanently failed messages + * - Alert ops team about failures + * - Store failed messages for manual review + * - Implement custom retry logic + * + * Setup: + * 1. Create DLQ: npx wrangler queues create my-dlq + * 2. Configure main queue consumer with DLQ: + * "dead_letter_queue": "my-dlq" in wrangler.jsonc + * 3. Create consumer for DLQ (this file) + * 4. Deploy both consumers + */ + +type Env = { + DB: D1Database; + ALERTS: KVNamespace; + MAIN_QUEUE: Queue; // Reference to main queue for retry +}; + +export default { + async queue( + batch: MessageBatch, + env: Env, + ctx: ExecutionContext + ): Promise { + console.log(`⚠️ Processing ${batch.messages.length} FAILED messages from DLQ`); + + for (const message of batch.messages) { + try { + await handleFailedMessage(message, env); + + // Acknowledge to remove from DLQ + message.ack(); + } catch (error) { + console.error(`Failed to process DLQ message ${message.id}:`, error); + // Don't ack - will retry in DLQ + } + } + }, +}; + +/** + * Handle permanently failed message + */ +async function handleFailedMessage(message: Message, env: Env) { + console.log(`💀 Dead Letter Message:`); + console.log(` ID: ${message.id}`); + console.log(` Attempts: ${message.attempts}`); + console.log(` Original Timestamp: ${message.timestamp}`); + console.log(` Body:`, message.body); + + // 1. Store in database for manual review + await storeFailed Message(message, env); + + // 2. Send alert to ops team + await sendAlert(message, env); + + // 3. Optional: Implement custom retry logic + if (shouldRetryInMainQueue(message)) { + await retryInMainQueue(message, env); + } +} + +/** + * Store failed message in database + */ +async function storeFailedMessage(message: Message, env: Env) { + await env.DB.prepare(` + INSERT INTO failed_messages ( + id, + queue_name, + body, + attempts, + original_timestamp, + failed_at, + error_details + ) VALUES (?, ?, ?, ?, ?, ?, ?) + `) + .bind( + message.id, + 'my-queue', // Or get from message.body if you include it + JSON.stringify(message.body), + message.attempts, + message.timestamp.toISOString(), + new Date().toISOString(), + JSON.stringify({ + reason: 'Max retries exceeded', + lastAttempt: message.attempts, + }) + ) + .run(); + + console.log(`Stored failed message in database: ${message.id}`); +} + +/** + * Send alert to ops team + */ +async function sendAlert(message: Message, env: Env) { + // Send to monitoring service (e.g., PagerDuty, Slack, Email) + const alert = { + severity: 'high', + title: 'Queue Message Permanently Failed', + description: `Message ${message.id} failed after ${message.attempts} attempts`, + details: { + messageId: message.id, + attempts: message.attempts, + body: message.body, + timestamp: message.timestamp.toISOString(), + }, + }; + + // Example: Store in KV for alert aggregation + const alertKey = `alert:${message.id}`; + await env.ALERTS.put(alertKey, JSON.stringify(alert), { + expirationTtl: 86400 * 7, // 7 days + }); + + // Example: Send webhook to Slack + await fetch(process.env.SLACK_WEBHOOK_URL || '', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + text: `🚨 Queue DLQ Alert: Message ${message.id} failed permanently`, + blocks: [ + { + type: 'section', + text: { + type: 'mrkdwn', + text: `*Message ID:* ${message.id}\n*Attempts:* ${message.attempts}\n*Type:* ${message.body.type}`, + }, + }, + ], + }), + }); + + console.log(`Alert sent for message: ${message.id}`); +} + +/** + * Determine if message should be retried in main queue + * (e.g., after fixing a bug, or for specific message types) + */ +function shouldRetryInMainQueue(message: Message): boolean { + // Example: Retry if it's a payment message and attempts < 10 + if (message.body.type === 'charge-payment' && message.attempts < 10) { + return true; + } + + // Example: Retry if it failed due to a specific error + if (message.body.retryable === true) { + return true; + } + + return false; +} + +/** + * Retry message in main queue (with delay) + */ +async function retryInMainQueue(message: Message, env: Env) { + console.log(`Retrying message ${message.id} in main queue`); + + // Send back to main queue with exponential delay + const delaySeconds = Math.min( + 3600 * Math.pow(2, message.attempts - 3), // Start from where DLQ picked up + 43200 // Max 12 hours + ); + + await env.MAIN_QUEUE.send( + { + ...message.body, + retriedFromDLQ: true, + originalMessageId: message.id, + dlqAttempts: message.attempts, + }, + { delaySeconds } + ); + + console.log(`Message ${message.id} re-queued with ${delaySeconds}s delay`); +} + +/** + * Manual retry endpoint (call via REST API or dashboard) + */ +export async function manualRetry(messageId: string, env: Env) { + // Fetch failed message from database + const result = await env.DB.prepare( + 'SELECT * FROM failed_messages WHERE id = ?' + ) + .bind(messageId) + .first(); + + if (!result) { + throw new Error(`Message ${messageId} not found in DLQ`); + } + + const body = JSON.parse(result.body as string); + + // Send back to main queue + await env.MAIN_QUEUE.send({ + ...body, + manualRetry: true, + retriedBy: 'admin', + retriedAt: new Date().toISOString(), + }); + + console.log(`Manually retried message: ${messageId}`); +} diff --git a/templates/queues-producer.ts b/templates/queues-producer.ts new file mode 100644 index 0000000..49af350 --- /dev/null +++ b/templates/queues-producer.ts @@ -0,0 +1,243 @@ +/** + * Queue Producer Example + * + * Shows how to send messages to Cloudflare Queues from a Hono Worker. + * + * Setup: + * 1. Create queue: npx wrangler queues create my-queue + * 2. Add producer binding to wrangler.jsonc (see wrangler-queues-config.jsonc) + * 3. Deploy: npm run deploy + */ + +import { Hono } from 'hono'; + +type Bindings = { + MY_QUEUE: Queue; +}; + +const app = new Hono<{ Bindings: Bindings }>(); + +// ============================================================================ +// Send Single Message +// ============================================================================ + +app.post('/send', async (c) => { + const body = await c.req.json(); + + // Simple send + await c.env.MY_QUEUE.send({ + type: 'process-order', + orderId: body.orderId, + userId: body.userId, + timestamp: Date.now(), + }); + + return c.json({ status: 'queued' }); +}); + +// ============================================================================ +// Send Message with Delay +// ============================================================================ + +app.post('/send-delayed', async (c) => { + const { task, delayMinutes } = await c.req.json(); + + // Delay message delivery + await c.env.MY_QUEUE.send( + { + type: 'scheduled-task', + task, + scheduledFor: Date.now() + (delayMinutes * 60 * 1000), + }, + { + delaySeconds: delayMinutes * 60, // Convert minutes to seconds + } + ); + + return c.json({ + status: 'scheduled', + delayMinutes, + processAt: new Date(Date.now() + (delayMinutes * 60 * 1000)).toISOString(), + }); +}); + +// ============================================================================ +// Send Batch of Messages +// ============================================================================ + +app.post('/send-batch', async (c) => { + const items = await c.req.json>(); + + // Validate batch size (max 100 messages) + if (items.length > 100) { + return c.json( + { error: 'Maximum 100 messages per batch' }, + 400 + ); + } + + // Send batch + await c.env.MY_QUEUE.sendBatch( + items.map((item) => ({ + body: { + type: 'batch-process', + itemId: item.id, + data: item.data, + }, + })) + ); + + return c.json({ + status: 'queued', + count: items.length, + }); +}); + +// ============================================================================ +// Send Batch with Individual Delays +// ============================================================================ + +app.post('/send-scheduled-batch', async (c) => { + const tasks = await c.req.json>(); + + await c.env.MY_QUEUE.sendBatch( + tasks.map((task) => ({ + body: { + type: 'scheduled-task', + task: task.task, + }, + delaySeconds: task.delayMinutes * 60, + })) + ); + + return c.json({ + status: 'scheduled', + count: tasks.length, + }); +}); + +// ============================================================================ +// Validate Message Size (< 128 KB) +// ============================================================================ + +app.post('/send-validated', async (c) => { + const body = await c.req.json(); + + // Check message size + const messageSize = new TextEncoder().encode(JSON.stringify(body)).length; + const MAX_SIZE = 128 * 1024; // 128 KB + + if (messageSize > MAX_SIZE) { + return c.json( + { + error: 'Message too large', + size: messageSize, + maxSize: MAX_SIZE, + }, + 400 + ); + } + + await c.env.MY_QUEUE.send(body); + + return c.json({ + status: 'queued', + messageSize, + }); +}); + +// ============================================================================ +// Real-World Example: E-commerce Order Processing +// ============================================================================ + +interface Order { + orderId: string; + userId: string; + items: Array<{ sku: string; quantity: number; price: number }>; + total: number; + email: string; +} + +app.post('/orders', async (c) => { + const order: Order = await c.req.json(); + + // Queue multiple tasks for this order + await c.env.MY_QUEUE.sendBatch([ + // Immediate: Send confirmation email + { + body: { + type: 'send-email', + template: 'order-confirmation', + to: order.email, + orderId: order.orderId, + }, + }, + + // Immediate: Update inventory + { + body: { + type: 'update-inventory', + items: order.items, + orderId: order.orderId, + }, + }, + + // Delayed: Send shipping notification (estimated 2 hours) + { + body: { + type: 'send-email', + template: 'shipping-notification', + to: order.email, + orderId: order.orderId, + }, + delaySeconds: 2 * 60 * 60, // 2 hours + }, + + // Delayed: Request review (3 days later) + { + body: { + type: 'send-email', + template: 'review-request', + to: order.email, + orderId: order.orderId, + }, + delaySeconds: 3 * 24 * 60 * 60, // 3 days + }, + ]); + + return c.json({ + status: 'success', + orderId: order.orderId, + tasksQueued: 4, + }); +}); + +// ============================================================================ +// Webhook Handler Example +// ============================================================================ + +app.post('/webhooks/:service', async (c) => { + const service = c.req.param('service'); + const payload = await c.req.json(); + + // Queue webhook for async processing + await c.env.MY_QUEUE.send({ + type: 'webhook', + service, + payload, + receivedAt: Date.now(), + }); + + // Respond immediately (don't block webhook) + return c.json({ received: true }, 200); +}); + +// ============================================================================ +// Health Check +// ============================================================================ + +app.get('/health', (c) => { + return c.json({ status: 'ok' }); +}); + +export default app; diff --git a/templates/queues-retry-with-delay.ts b/templates/queues-retry-with-delay.ts new file mode 100644 index 0000000..dc94101 --- /dev/null +++ b/templates/queues-retry-with-delay.ts @@ -0,0 +1,241 @@ +/** + * Queue Consumer with Exponential Backoff Retry + * + * Use when: Calling rate-limited APIs or handling temporary failures + * + * Strategy: + * - Retry with increasing delays: 1m → 2m → 4m → 8m → ... + * - Different delays for different error types + * - Max delay cap to prevent excessive waits + * + * Setup: + * 1. Create queue: npx wrangler queues create api-tasks + * 2. Create DLQ: npx wrangler queues create api-tasks-dlq + * 3. Configure consumer with higher max_retries (e.g., 10) + * 4. Deploy: npm run deploy + */ + +type Env = { + DB: D1Database; + API_KEY: string; +}; + +export default { + async queue( + batch: MessageBatch, + env: Env, + ctx: ExecutionContext + ): Promise { + console.log(`Processing batch of ${batch.messages.length} messages`); + + for (const message of batch.messages) { + try { + await processWithRetry(message, env); + message.ack(); + } catch (error) { + await handleError(message, error); + } + } + }, +}; + +/** + * Process message with smart retry logic + */ +async function processWithRetry(message: Message, env: Env) { + const { type, data } = message.body; + + console.log(`Processing ${type} (attempt ${message.attempts})`); + + switch (type) { + case 'call-api': + await callExternalAPI(data, message.attempts); + break; + + case 'process-webhook': + await processWebhook(data); + break; + + default: + throw new Error(`Unknown message type: ${type}`); + } +} + +/** + * Call external API with retry handling + */ +async function callExternalAPI(data: any, attempts: number) { + const response = await fetch(data.url, { + method: data.method || 'POST', + headers: { + 'Content-Type': 'application/json', + ...data.headers, + }, + body: JSON.stringify(data.payload), + }); + + // Handle different response codes + if (response.ok) { + console.log(`✅ API call successful`); + return await response.json(); + } + + // Rate limiting + if (response.status === 429) { + const retryAfter = response.headers.get('Retry-After'); + const delaySeconds = retryAfter ? parseInt(retryAfter) : undefined; + + throw new RateLimitError('Rate limited', delaySeconds, attempts); + } + + // Server errors (500-599) - retry + if (response.status >= 500) { + throw new ServerError(`Server error: ${response.status}`, attempts); + } + + // Client errors (400-499) - don't retry + if (response.status >= 400) { + const error = await response.text(); + throw new ClientError(`Client error: ${error}`); + } + + throw new Error(`Unexpected response: ${response.status}`); +} + +/** + * Process webhook with timeout + */ +async function processWebhook(data: any) { + // Simulate processing + await new Promise(resolve => setTimeout(resolve, 1000)); + + console.log(`Webhook processed: ${data.id}`); +} + +/** + * Handle errors with appropriate retry strategy + */ +async function handleError(message: Message, error: any) { + console.error(`Error processing message ${message.id}:`, error); + + // Rate limit error - use suggested delay or exponential backoff + if (error instanceof RateLimitError) { + const delaySeconds = error.suggestedDelay || calculateExponentialBackoff( + message.attempts, + 60, // Base delay: 1 minute + 3600 // Max delay: 1 hour + ); + + console.log(`⏰ Rate limited. Retrying in ${delaySeconds}s (attempt ${message.attempts})`); + + message.retry({ delaySeconds }); + return; + } + + // Server error - exponential backoff + if (error instanceof ServerError) { + const delaySeconds = calculateExponentialBackoff( + message.attempts, + 30, // Base delay: 30 seconds + 1800 // Max delay: 30 minutes + ); + + console.log(`🔄 Server error. Retrying in ${delaySeconds}s (attempt ${message.attempts})`); + + message.retry({ delaySeconds }); + return; + } + + // Client error - don't retry (will go to DLQ) + if (error instanceof ClientError) { + console.error(`❌ Client error. Not retrying: ${error.message}`); + // Don't call ack() or retry() - will fail and go to DLQ + return; + } + + // Unknown error - retry with exponential backoff + const delaySeconds = calculateExponentialBackoff( + message.attempts, + 60, // Base delay: 1 minute + 7200 // Max delay: 2 hours + ); + + console.log(`⚠️ Unknown error. Retrying in ${delaySeconds}s (attempt ${message.attempts})`); + + message.retry({ delaySeconds }); +} + +/** + * Calculate exponential backoff delay + * + * Formula: min(baseDelay * 2^(attempts-1), maxDelay) + * + * Example (baseDelay=60, maxDelay=3600): + * - Attempt 1: 60s (1 min) + * - Attempt 2: 120s (2 min) + * - Attempt 3: 240s (4 min) + * - Attempt 4: 480s (8 min) + * - Attempt 5: 960s (16 min) + * - Attempt 6: 1920s (32 min) + * - Attempt 7+: 3600s (1 hour) - capped + */ +function calculateExponentialBackoff( + attempts: number, + baseDelay: number, + maxDelay: number +): number { + const delay = baseDelay * Math.pow(2, attempts - 1); + return Math.min(delay, maxDelay); +} + +/** + * Calculate jittered backoff (prevents thundering herd) + * + * Adds randomness to delay to spread out retries + */ +function calculateJitteredBackoff( + attempts: number, + baseDelay: number, + maxDelay: number +): number { + const exponentialDelay = baseDelay * Math.pow(2, attempts - 1); + const delay = Math.min(exponentialDelay, maxDelay); + + // Add jitter: ±25% randomness + const jitter = delay * 0.25 * (Math.random() * 2 - 1); + + return Math.floor(delay + jitter); +} + +// ============================================================================ +// Custom Error Classes +// ============================================================================ + +class RateLimitError extends Error { + suggestedDelay?: number; + attempts: number; + + constructor(message: string, suggestedDelay?: number, attempts: number = 1) { + super(message); + this.name = 'RateLimitError'; + this.suggestedDelay = suggestedDelay; + this.attempts = attempts; + } +} + +class ServerError extends Error { + attempts: number; + + constructor(message: string, attempts: number = 1) { + super(message); + this.name = 'ServerError'; + this.attempts = attempts; + } +} + +class ClientError extends Error { + constructor(message: string) { + super(message); + this.name = 'ClientError'; + } +} diff --git a/templates/wrangler-queues-config.jsonc b/templates/wrangler-queues-config.jsonc new file mode 100644 index 0000000..2305f6a --- /dev/null +++ b/templates/wrangler-queues-config.jsonc @@ -0,0 +1,57 @@ +{ + "name": "my-worker", + "main": "src/index.ts", + "compatibility_date": "2025-10-11", + + // Queue Bindings Configuration + "queues": { + + // Producer Configuration (send messages TO queues) + "producers": [ + { + "binding": "MY_QUEUE", // Available as env.MY_QUEUE in code + "queue": "my-queue", // Queue name (must be created first) + "delivery_delay": 0 // Optional: delay all messages (0-43200 seconds) + }, + { + "binding": "HIGH_PRIORITY_QUEUE", + "queue": "high-priority-queue" + }, + { + "binding": "LOW_PRIORITY_QUEUE", + "queue": "low-priority-queue", + "delivery_delay": 60 // Delay all messages by 1 minute + } + ], + + // Consumer Configuration (receive messages FROM queues) + "consumers": [ + { + "queue": "my-queue", // Queue to consume from + "max_batch_size": 10, // Max messages per batch (1-100, default: 10) + "max_batch_timeout": 5, // Max seconds to wait (0-60, default: 5) + "max_retries": 3, // Max retry attempts (0-100, default: 3) + "dead_letter_queue": "my-dlq", // Optional: where failed messages go + "retry_delay": 0, // Optional: delay retries (seconds) + "max_concurrency": null // Optional: limit concurrent consumers (default: auto-scale to 250) + }, + { + "queue": "high-priority-queue", + "max_batch_size": 50, // Larger batches for high volume + "max_batch_timeout": 1, // Low latency + "max_retries": 5, + "dead_letter_queue": "priority-dlq" + }, + { + "queue": "my-dlq", // Consumer for dead letter queue + "max_batch_size": 10, + "max_batch_timeout": 30 // Can wait longer for DLQ + } + ] + }, + + // Optional: Increase CPU limit for long-running consumers + "limits": { + "cpu_ms": 30000 // 30 seconds (default). Can be increased to 300000 (5 minutes) + } +}