commit f6fce7220ba338370b27ac524e977b4b9f392a2a Author: Zhongwei Li Date: Sun Nov 30 08:24:41 2025 +0800 Initial commit diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..247c7a2 --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,12 @@ +{ + "name": "cloudflare-workflows", + "description": "Build durable, long-running workflows on Cloudflare Workers with automatic retries, state persistence, and multi-step orchestration. Supports step.do, step.sleep, step.waitForEvent, and runs for hours to days. Use when: creating long-running workflows, implementing retry logic, building event-driven processes, coordinating API calls, scheduling multi-step tasks, or troubleshooting NonRetryableError, I/O context, serialization errors, or workflow execution failures.", + "version": "1.0.0", + "author": { + "name": "Jeremy Dawes", + "email": "jeremy@jezweb.net" + }, + "skills": [ + "./" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..7609ac5 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# cloudflare-workflows + +Build durable, long-running workflows on Cloudflare Workers with automatic retries, state persistence, and multi-step orchestration. Supports step.do, step.sleep, step.waitForEvent, and runs for hours to days. Use when: creating long-running workflows, implementing retry logic, building event-driven processes, coordinating API calls, scheduling multi-step tasks, or troubleshooting NonRetryableError, I/O context, serialization errors, or workflow execution failures. diff --git a/SKILL.md b/SKILL.md new file mode 100644 index 0000000..a76219a --- /dev/null +++ b/SKILL.md @@ -0,0 +1,589 @@ +--- +name: cloudflare-workflows +description: | + Build durable workflows with Cloudflare Workflows (GA April 2025). Features step.do, step.sleep, waitForEvent, + Vitest testing, and runs for hours to days with automatic retries and state persistence. + + Use when: creating long-running workflows, implementing retry logic, building event-driven processes, + testing workflows with cloudflare:test, coordinating API calls, or troubleshooting NonRetryableError, + I/O context errors, serialization failures. + + Keywords: cloudflare workflows, workflows workers, durable execution, workflow step, + WorkflowEntrypoint, step.do, step.sleep, workflow retries, NonRetryableError, + workflow state, wrangler workflows, workflow events, long-running tasks, step.sleepUntil, + step.waitForEvent, workflow bindings, vitest testing, cloudflare:test, introspectWorkflowInstance +license: MIT +--- + +# Cloudflare Workflows + +**Status**: Production Ready ✅ (GA since April 2025) +**Last Updated**: 2025-11-25 +**Dependencies**: cloudflare-worker-base (for Worker setup) +**Latest Versions**: wrangler@4.50.0, @cloudflare/workers-types@4.20251121.0 + +**Recent Updates (2025)**: +- **April 2025**: Workflows GA release - waitForEvent API, Vitest testing, CPU time metrics, 4,500 concurrent instances +- **October 2025**: Instance creation rate 10x faster (100/sec), concurrency increased to 10,000 +- **2025 Limits**: Max steps 1,024, state persistence 1MB/step (100MB-1GB per instance), event payloads 1MB, CPU time 5 min max +- **Testing**: cloudflare:test module with introspectWorkflowInstance, disableSleeps, mockStepResult, mockEvent modifiers +- **Platform**: Waiting instances don't count toward concurrency, retention 3-30 days, subrequests 50-1,000 + +--- + +## Quick Start (5 Minutes) + +```bash +# 1. Scaffold project +npm create cloudflare@latest my-workflow -- --template cloudflare/workflows-starter --git --deploy false +cd my-workflow + +# 2. Configure wrangler.jsonc +{ + "name": "my-workflow", + "main": "src/index.ts", + "compatibility_date": "2025-11-25", + "workflows": [{ + "name": "my-workflow", + "binding": "MY_WORKFLOW", + "class_name": "MyWorkflow" + }] +} + +# 3. Create workflow (src/index.ts) +import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'; + +export class MyWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const result = await step.do('process', async () => { /* work */ }); + await step.sleep('wait', '1 hour'); + await step.do('continue', async () => { /* more work */ }); + } +} + +# 4. Deploy and test +npm run deploy +npx wrangler workflows instances list my-workflow +``` + +**CRITICAL**: Extends `WorkflowEntrypoint`, implements `run()` with `step` methods, bindings in wrangler.jsonc + +--- + + +## Step Methods + +### step.do() - Execute Work + +```typescript +step.do(name: string, config?: WorkflowStepConfig, callback: () => Promise): Promise +``` + +**Parameters:** +- `name` - Step name (for observability) +- `config` (optional) - Retry configuration (retries, timeout, backoff) +- `callback` - Async function that does the work + +**Returns:** Value from callback (must be serializable) + +**Example:** +```typescript +const result = await step.do('call API', { retries: { limit: 10, delay: '10s', backoff: 'exponential' }, timeout: '5 min' }, async () => { + return await fetch('https://api.example.com/data').then(r => r.json()); +}); +``` + +**CRITICAL - Serialization:** +- ✅ Allowed: string, number, boolean, Array, Object, null +- ❌ Forbidden: Function, Symbol, circular references, undefined +- Throws error if return value isn't JSON serializable + +--- + +### step.sleep() - Relative Sleep + +```typescript +step.sleep(name: string, duration: WorkflowDuration): Promise +``` + +**Parameters:** +- `name` - Step name +- `duration` - Number (ms) or string: `"second"`, `"minute"`, `"hour"`, `"day"`, `"week"`, `"month"`, `"year"` (plural forms accepted) + +**Examples:** +```typescript +await step.sleep('wait 5 minutes', '5 minutes'); +await step.sleep('wait 1 hour', '1 hour'); +await step.sleep('wait 2 days', '2 days'); +await step.sleep('wait 30 seconds', 30000); // milliseconds +``` + +**Note:** Resuming workflows take priority over new instances. Sleeps don't count toward step limits. + +--- + +### step.sleepUntil() - Sleep to Specific Date + +```typescript +step.sleepUntil(name: string, timestamp: Date | number): Promise +``` + +**Parameters:** +- `name` - Step name +- `timestamp` - Date object or UNIX timestamp (milliseconds) + +**Examples:** +```typescript +await step.sleepUntil('wait for launch', new Date('2025-12-25T00:00:00Z')); +await step.sleepUntil('wait until time', Date.parse('24 Oct 2024 13:00:00 UTC')); +``` + +--- + +### step.waitForEvent() - Wait for External Event (GA April 2025) + +```typescript +step.waitForEvent(name: string, options: { type: string; timeout?: string | number }): Promise +``` + +**Parameters:** +- `name` - Step name +- `options.type` - Event type to match +- `options.timeout` (optional) - Max wait time (default: 24 hours, max: 30 days) + +**Returns:** Event payload sent via `instance.sendEvent()` + +**Example:** +```typescript +export class PaymentWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + await step.do('create payment', async () => { /* Stripe API */ }); + + const webhookData = await step.waitForEvent( + 'wait for payment confirmation', + { type: 'stripe-webhook', timeout: '1 hour' } + ); + + if (webhookData.status === 'succeeded') { + await step.do('fulfill order', async () => { /* fulfill */ }); + } + } +} + +// Worker sends event to workflow +export default { + async fetch(req: Request, env: Env): Promise { + if (req.url.includes('/webhook/stripe')) { + const instance = await env.PAYMENT_WORKFLOW.get(instanceId); + await instance.sendEvent({ type: 'stripe-webhook', payload: await req.json() }); + return new Response('OK'); + } + } +}; +``` + +**Timeout handling:** +```typescript +try { + const event = await step.waitForEvent('wait for user', { type: 'user-submitted', timeout: '10 minutes' }); +} catch (error) { + await step.do('send reminder', async () => { /* reminder */ }); +} +``` + +--- + +## WorkflowStepConfig + +```typescript +interface WorkflowStepConfig { + retries?: { + limit: number; // Max attempts (Infinity allowed) + delay: string | number; // Delay between retries + backoff?: 'constant' | 'linear' | 'exponential'; + }; + timeout?: string | number; // Max time per attempt +} +``` + +**Default:** `{ retries: { limit: 5, delay: 10000, backoff: 'exponential' }, timeout: '10 minutes' }` + +**Backoff Examples:** +```typescript +// Constant: 30s, 30s, 30s +{ retries: { limit: 3, delay: '30 seconds', backoff: 'constant' } } + +// Linear: 1m, 2m, 3m, 4m, 5m +{ retries: { limit: 5, delay: '1 minute', backoff: 'linear' } } + +// Exponential (recommended): 10s, 20s, 40s, 80s, 160s +{ retries: { limit: 10, delay: '10 seconds', backoff: 'exponential' }, timeout: '5 minutes' } + +// Unlimited retries +{ retries: { limit: Infinity, delay: '1 minute', backoff: 'exponential' } } + +// No retries +{ retries: { limit: 0 } } +``` + +--- + +## Error Handling + +### NonRetryableError + +Force workflow to fail immediately without retrying: + +```typescript +import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'; +import { NonRetryableError } from 'cloudflare:workflows'; + +export class MyWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + await step.do('validate input', async () => { + if (!event.payload.userId) { + throw new NonRetryableError('userId is required'); + } + + // Validate user exists + const user = await this.env.DB.prepare( + 'SELECT * FROM users WHERE id = ?' + ).bind(event.payload.userId).first(); + + if (!user) { + // Terminal error - retrying won't help + throw new NonRetryableError('User not found'); + } + + return user; + }); + } +} +``` + +**When to use NonRetryableError:** +- ✅ Authentication/authorization failures +- ✅ Invalid input that won't change +- ✅ Resource doesn't exist (404) +- ✅ Validation errors +- ❌ Network failures (should retry) +- ❌ Rate limits (should retry with backoff) +- ❌ Temporary service outages (should retry) + +--- + +### Catch Errors to Continue Workflow + +Prevent workflow failure by catching optional step errors: + +```typescript +export class MyWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + await step.do('process payment', async () => { /* critical */ }); + + try { + await step.do('send email', async () => { /* optional */ }); + } catch (error) { + await step.do('log failure', async () => { + await this.env.DB.prepare('INSERT INTO failed_emails VALUES (?, ?)').bind(event.payload.userId, error.message).run(); + }); + } + + await step.do('update status', async () => { /* continues */ }); + } +} +``` + +**Graceful Degradation:** +```typescript +let result; +try { + result = await step.do('call primary API', async () => await callPrimaryAPI()); +} catch { + result = await step.do('call backup API', async () => await callBackupAPI()); +} +``` + +--- + +## Triggering Workflows + +**Configure binding (wrangler.jsonc):** +```jsonc +{ + "workflows": [{ + "name": "my-workflow", + "binding": "MY_WORKFLOW", + "class_name": "MyWorkflow", + "script_name": "workflow-worker" // If workflow in different Worker + }] +} +``` + +**Trigger from Worker:** +```typescript +const instance = await env.MY_WORKFLOW.create({ params: { userId: '123' } }); +return Response.json({ id: instance.id, status: await instance.status() }); +``` + +**Instance Management:** +```typescript +const instance = await env.MY_WORKFLOW.get(instanceId); +const status = await instance.status(); // { status: 'running'|'complete'|'errored'|'queued', error, output } +await instance.sendEvent({ type: 'user-action', payload: { action: 'approved' } }); +await instance.pause(); +await instance.resume(); +await instance.terminate(); +``` + +--- + + +## State Persistence + +Workflows automatically persist state returned from `step.do()`: + +**✅ Serializable:** +- Primitives: `string`, `number`, `boolean`, `null` +- Arrays, Objects, Nested structures + +**❌ Non-Serializable:** +- Functions, Symbols, circular references, undefined, class instances + +**Example:** +```typescript +// ✅ Good +const result = await step.do('fetch data', async () => ({ + users: [{ id: 1, name: 'Alice' }], + timestamp: Date.now(), + metadata: null +})); + +// ❌ Bad - function not serializable +const bad = await step.do('bad', async () => ({ data: [1, 2, 3], transform: (x) => x * 2 })); // Throws error! +``` + +**Access State Across Steps:** +```typescript +const userData = await step.do('fetch user', async () => ({ id: 123, email: 'user@example.com' })); +const orderData = await step.do('create order', async () => ({ userId: userData.id, orderId: 'ORD-456' })); +await step.do('send email', async () => sendEmail({ to: userData.email, subject: `Order ${orderData.orderId}` })); +``` + +--- + +## Observability + +### Built-in Metrics (Enhanced in 2025) + +Workflows automatically track: +- **Instance status**: queued, running, complete, errored, paused, waiting +- **Step execution**: start/end times, duration, success/failure +- **Retry history**: attempts, errors, delays +- **Sleep state**: when workflow will wake up +- **Output**: return values from steps and run() +- **CPU time** (GA April 2025): Active processing time per instance for billing insights + +### View Metrics in Dashboard + +Access via Cloudflare dashboard: +1. Workers & Pages +2. Select your workflow +3. View instances and metrics + +**Metrics include:** +- Total instances created +- Success/error rates +- Average execution time +- Step-level performance +- **CPU time consumption** (2025 feature) + +### Programmatic Access + +```typescript +const instance = await env.MY_WORKFLOW.get(instanceId); +const status = await instance.status(); + +console.log(status); +// { +// status: 'complete' | 'running' | 'errored' | 'queued' | 'waiting' | 'unknown', +// error: string | null, +// output: { userId: '123', status: 'processed' } +// } +``` + +**CPU Time Configuration (2025):** +```jsonc +// wrangler.jsonc +{ "limits": { "cpu_ms": 300000 } } // 5 minutes max (default: 30 seconds) +``` + +--- + +## Limits (Updated 2025) + +| Feature | Workers Free | Workers Paid | +|---------|--------------|--------------| +| **Max steps per workflow** | 1,024 | 1,024 | +| **Max state per step** | 1 MiB | 1 MiB | +| **Max state per instance** | 100 MB | 1 GB | +| **Max event payload size** | 1 MiB | 1 MiB | +| **Max sleep/sleepUntil duration** | 365 days | 365 days | +| **Max waitForEvent timeout** | 365 days | 365 days | +| **CPU time per step** | 10 ms | 30 sec (default), 5 min (max) | +| **Duration (wall clock) per step** | Unlimited | Unlimited | +| **Max workflow executions** | 100,000/day | Unlimited | +| **Concurrent instances** | 25 | 10,000 (Oct 2025, up from 4,500) | +| **Instance creation rate** | 100/second | 100/second (Oct 2025, 10x faster) | +| **Max queued instances** | 100,000 | 1,000,000 | +| **Max subrequests per instance** | 50/request | 1,000/request | +| **Retention (completed state)** | 3 days | 30 days | +| **Max Workflow name length** | 64 chars | 64 chars | +| **Max instance ID length** | 100 chars | 100 chars | + +**CRITICAL Notes:** +- `step.sleep()` and `step.sleepUntil()` do NOT count toward 1,024 step limit +- **Waiting instances** (sleeping, retrying, or waiting for events) do NOT count toward concurrency limits +- Instance creation rate increased 10x (October 2025): 100 per 10 seconds → 100 per second +- Max concurrency increased (October 2025): 4,500 → 10,000 concurrent instances +- State persistence limits increased (2025): 128 KB → 1 MiB per step, 100 MB - 1 GB per instance +- Event payload size increased (2025): 128 KB → 1 MiB +- CPU time configurable via `wrangler.jsonc`: `{ "limits": { "cpu_ms": 300000 } }` (5 min max) + +--- + +## Pricing + +**Requires Workers Paid plan** ($5/month) + +**Workflow Executions:** +- First 10,000,000 step executions/month: **FREE** +- After that: **$0.30 per million step executions** + +**What counts as a step execution:** +- Each `step.do()` call +- Each retry of a step +- `step.sleep()`, `step.sleepUntil()`, `step.waitForEvent()` do NOT count + +**Cost examples:** +- Workflow with 5 steps, no retries: **5 step executions** +- Workflow with 3 steps, 1 step retries 2 times: **5 step executions** (3 + 2) +- 10M simple workflows/month (5 steps each): ((50M - 10M) / 1M) × $0.30 = **$12/month** + + +## Troubleshooting + +### Issue: "Cannot perform I/O on behalf of a different request" + +**Cause:** Trying to use I/O objects created in one request context from another request handler + +**Solution:** Always perform I/O within `step.do()` callbacks + +```typescript +// ❌ Bad - I/O outside step +const response = await fetch('https://api.example.com/data'); +const data = await response.json(); + +await step.do('use data', async () => { + // Using data from outside step's I/O context + return data; // This will fail! +}); + +// ✅ Good - I/O inside step +const data = await step.do('fetch data', async () => { + const response = await fetch('https://api.example.com/data'); + return await response.json(); // ✅ Correct +}); +``` + +--- + +### Issue: NonRetryableError behaves differently in dev vs production + +**Known Issue:** Throwing NonRetryableError with empty message in dev mode causes retries, but works correctly in production + +**Workaround:** Always provide a message to NonRetryableError + +```typescript +// ❌ May retry in dev +throw new NonRetryableError(); + +// ✅ Works consistently +throw new NonRetryableError('User not found'); +``` + +**Source:** [workers-sdk#10113](https://github.com/cloudflare/workers-sdk/issues/10113) + +--- + +## Vitest Testing (GA April 2025) + +Workflows support full testing integration via `cloudflare:test` module. + +### Setup + +```bash +npm install -D vitest@latest @cloudflare/vitest-pool-workers@latest +``` + +**vitest.config.ts:** +```typescript +import { defineWorkersConfig } from '@cloudflare/vitest-pool-workers/config'; +export default defineWorkersConfig({ test: { poolOptions: { workers: { miniflare: { bindings: { MY_WORKFLOW: { scriptName: 'workflow' } } } } } } }); +``` + +### Introspection API + +```typescript +import { env, introspectWorkflowInstance } from 'cloudflare:test'; + +it('should complete workflow', async () => { + const instance = await introspectWorkflowInstance(env.MY_WORKFLOW, 'test-123'); + + try { + await instance.modify(async (m) => { + await m.disableSleeps(); // Skip all sleeps + await m.mockStepResult({ name: 'fetch data' }, { users: [{ id: 1 }] }); // Mock step result + await m.mockEvent({ type: 'approval', payload: { approved: true } }); // Send mock event + await m.mockStepError({ name: 'call API' }, new Error('Network timeout'), 1); // Force error once + }); + + await env.MY_WORKFLOW.create({ id: 'test-123' }); + await expect(instance.waitForStatus('complete')).resolves.not.toThrow(); + } finally { + await instance.dispose(); // Cleanup + } +}); +``` + +### Test Modifiers + +- `disableSleeps(steps?)` - Skip sleeps instantly +- `mockStepResult(step, result)` - Mock step.do() result +- `mockStepError(step, error, times?)` - Force step.do() to throw +- `mockEvent(event)` - Send mock event to step.waitForEvent() +- `forceStepTimeout(step, times?)` - Force step.do() timeout +- `forceEventTimeout(step)` - Force step.waitForEvent() timeout + +**Official Docs**: https://developers.cloudflare.com/workers/testing/vitest-integration/ + +--- + +## Related Documentation + +- **Cloudflare Workflows Docs**: https://developers.cloudflare.com/workflows/ +- **Get Started Guide**: https://developers.cloudflare.com/workflows/get-started/guide/ +- **Workers API**: https://developers.cloudflare.com/workflows/build/workers-api/ +- **Vitest Testing**: https://developers.cloudflare.com/workers/testing/vitest-integration/ +- **Sleeping and Retrying**: https://developers.cloudflare.com/workflows/build/sleeping-and-retrying/ +- **Events and Parameters**: https://developers.cloudflare.com/workflows/build/events-and-parameters/ +- **Limits**: https://developers.cloudflare.com/workflows/reference/limits/ +- **Pricing**: https://developers.cloudflare.com/workflows/platform/pricing/ +- **Changelog**: https://developers.cloudflare.com/workflows/reference/changelog/ +- **MCP Tool**: Use `mcp__cloudflare-docs__search_cloudflare_documentation` for latest docs + +--- + +**Last Updated**: 2025-11-25 +**Version**: 1.0.0 +**Maintainer**: Jeremy Dawes | jeremy@jezweb.net diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..edb8102 --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,77 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:jezweb/claude-skills:skills/cloudflare-workflows", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "dc24af870e89687c7c213c63d93b5328717e9c39", + "treeHash": "928c2023d9f2b055674faca59bf755b330124301b94c312810ffb099a294812e", + "generatedAt": "2025-11-28T10:18:57.228742Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "cloudflare-workflows", + "description": "Build durable, long-running workflows on Cloudflare Workers with automatic retries, state persistence, and multi-step orchestration. Supports step.do, step.sleep, step.waitForEvent, and runs for hours to days. Use when: creating long-running workflows, implementing retry logic, building event-driven processes, coordinating API calls, scheduling multi-step tasks, or troubleshooting NonRetryableError, I/O context, serialization errors, or workflow execution failures.", + "version": "1.0.0" + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "950015c9480bf32195e22418aaae3544642911a799ff3112ed314b5d6e854b07" + }, + { + "path": "SKILL.md", + "sha256": "e6cd4e70d38e2ee2bf1eb3381f4324cacb85fe73db3679b6e14b835e5723fec2" + }, + { + "path": "references/common-issues.md", + "sha256": "7964a10795380621d2ae6c3f6d8cdff81ac0ac8d876142331160446558e3c0e6" + }, + { + "path": "references/workflow-patterns.md", + "sha256": "099ae2b8a1d190a6b8d6b74a1e464a96655d19059eceb075da0e60690380dcaf" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "1cda1aa87cf945ab9220c0b0b3cb2e41986a3de043d25b1978704a689604aab8" + }, + { + "path": "templates/wrangler-workflows-config.jsonc", + "sha256": "6eae31c64dcb9ba03e8cc0de42c38d138fcae17279d0eb4766e82bf1ed34c684" + }, + { + "path": "templates/scheduled-workflow.ts", + "sha256": "ca55491a2ee0d97f791d6017c56a030090718baf6e21809d15339b82e9989881" + }, + { + "path": "templates/workflow-with-retries.ts", + "sha256": "f80370e3bdd64a34efc7d1ae506779806e32499dfc009f8002a49cccd8cb969d" + }, + { + "path": "templates/worker-trigger.ts", + "sha256": "bead780a84a8e3dae466b0008ccdf6bfda7eebd34ba70906dcb6e356bb82fc4d" + }, + { + "path": "templates/basic-workflow.ts", + "sha256": "e32b3b5b671a6897af32b7f7a5bf640a30f9b0c9094ba5e3822cb5ad7fd5bdb5" + }, + { + "path": "templates/workflow-with-events.ts", + "sha256": "7644d8a6a9600365f86045d7bc391ff6cc3eca755f152cfc2a0dded9cfa5dc78" + } + ], + "dirSha256": "928c2023d9f2b055674faca59bf755b330124301b94c312810ffb099a294812e" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file diff --git a/references/common-issues.md b/references/common-issues.md new file mode 100644 index 0000000..30d7b50 --- /dev/null +++ b/references/common-issues.md @@ -0,0 +1,413 @@ +# Cloudflare Workflows - Common Issues + +**Last Updated**: 2025-10-22 + +This document details all known issues with Cloudflare Workflows and their solutions. + +--- + +## Issue #1: I/O Context Error + +**Error Message:** +``` +Cannot perform I/O on behalf of a different request +``` + +**Description:** +When trying to use I/O objects (like fetch responses, file handles, etc.) created in one request context from a different request's handler, Cloudflare Workers throws this error. This is a fundamental Workers platform limitation. + +**Root Cause:** +I/O objects are bound to the request context that created them. Workflows create a new execution context for each step, so I/O must happen within the step's callback. + +**Prevention:** + +❌ **Bad - I/O outside step:** +```typescript +// This will fail! +const response = await fetch('https://api.example.com/data'); +const data = await response.json(); + +await step.do('use data', async () => { + // Trying to use data from outside step's context + return data; // ❌ Error! +}); +``` + +✅ **Good - I/O inside step:** +```typescript +const data = await step.do('fetch data', async () => { + const response = await fetch('https://api.example.com/data'); + return await response.json(); // ✅ Correct +}); +``` + +**Workaround:** +Always perform all I/O operations (fetch, KV reads, D1 queries, R2 operations) within `step.do()` callbacks. + +**Source:** Cloudflare Workers platform limitation + +--- + +## Issue #2: NonRetryableError Behaves Differently in Dev vs Production + +**Error Message:** +``` +(No specific error - workflow retries when it shouldn't) +``` + +**Description:** +When throwing a `NonRetryableError` with an empty message in development mode (`wrangler dev`), the workflow incorrectly retries the failed step. In production, it correctly exits without retrying. + +**Root Cause:** +Bug in the development environment handling of empty NonRetryableError messages. + +**Prevention:** + +❌ **Bad - Empty message:** +```typescript +import { NonRetryableError } from 'cloudflare:workflows'; + +// May retry in dev mode +throw new NonRetryableError(); +``` + +✅ **Good - Always provide message:** +```typescript +import { NonRetryableError } from 'cloudflare:workflows'; + +// Works consistently in dev and production +throw new NonRetryableError('User not found'); +throw new NonRetryableError('Invalid authentication credentials'); +throw new NonRetryableError('Amount exceeds limit'); +``` + +**Workaround:** +Always provide a descriptive message when throwing NonRetryableError. + +**Source:** [cloudflare/workers-sdk#10113](https://github.com/cloudflare/workers-sdk/issues/10113) +**Status:** Reported July 2025, not yet fixed + +--- + +## Issue #3: WorkflowEvent Export Not Found + +**Error Message:** +``` +The requested module 'cloudflare:workers' does not provide an export named 'WorkflowEvent' +``` + +**Description:** +TypeScript cannot find the `WorkflowEvent` export from the `cloudflare:workers` module. This usually happens with outdated type definitions. + +**Root Cause:** +- Outdated `@cloudflare/workers-types` package +- Incorrect import statement +- Missing types in tsconfig.json + +**Prevention:** + +✅ **Ensure latest types installed:** +```bash +npm install -D @cloudflare/workers-types@latest +``` + +✅ **Correct import:** +```typescript +import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'; +import { NonRetryableError } from 'cloudflare:workflows'; +``` + +✅ **Correct tsconfig.json:** +```json +{ + "compilerOptions": { + "types": ["@cloudflare/workers-types/2023-07-01"], + "moduleResolution": "bundler" + } +} +``` + +**Workaround:** +1. Update workers types: `npm install -D @cloudflare/workers-types@latest` +2. Run type generation: `npx wrangler types` +3. Restart TypeScript server in your editor + +**Source:** Community reports, package versioning issues +**Latest Working Version:** @cloudflare/workers-types@4.20251014.0 (verified 2025-10-22) + +--- + +## Issue #4: Serialization Error - Non-Serializable Return Values + +**Error Message:** +``` +Error: Could not serialize return value +(or workflow hangs without clear error) +``` + +**Description:** +Attempting to return non-serializable values from `step.do()` or `run()` methods causes serialization failures. The workflow instance may error or hang. + +**Root Cause:** +Workflows persist state between steps by serializing return values. Only JSON-serializable types are supported. + +**Prevention:** + +❌ **Bad - Non-serializable types:** +```typescript +// ❌ Function +await step.do('bad example', async () => { + return { + data: [1, 2, 3], + transform: (x) => x * 2 // ❌ Function not serializable + }; +}); + +// ❌ Circular reference +await step.do('bad example 2', async () => { + const obj: any = { name: 'test' }; + obj.self = obj; // ❌ Circular reference + return obj; +}); + +// ❌ Symbol +await step.do('bad example 3', async () => { + return { + id: Symbol('unique'), // ❌ Symbol not serializable + data: 'test' + }; +}); + +// ❌ undefined (use null instead) +await step.do('bad example 4', async () => { + return { + value: undefined // ❌ undefined not serializable + }; +}); +``` + +✅ **Good - Only serializable types:** +```typescript +await step.do('good example', async () => { + return { + // ✅ Primitives + string: 'value', + number: 123, + boolean: true, + nullValue: null, + + // ✅ Arrays + array: [1, 2, 3], + + // ✅ Objects + nested: { + data: 'test', + items: [{ id: 1 }, { id: 2 }] + } + }; +}); +``` + +✅ **Convert class instances to plain objects:** +```typescript +class User { + constructor(public id: string, public name: string) {} + + toJSON() { + return { id: this.id, name: this.name }; + } +} + +await step.do('serialize class', async () => { + const user = new User('123', 'Alice'); + + // ✅ Convert to plain object + return user.toJSON(); // { id: '123', name: 'Alice' } +}); +``` + +**Workaround:** +- Only return primitives, arrays, and plain objects +- Convert class instances to plain objects before returning +- Use `null` instead of `undefined` +- Avoid circular references + +**Source:** Cloudflare Workflows documentation +**Reference:** [Workflows Workers API](https://developers.cloudflare.com/workflows/build/workers-api/) + +--- + +## Issue #5: Testing Workflows in CI Environments + +**Error Message:** +``` +(Tests pass locally but fail in CI) +``` + +**Description:** +Tests that use `vitest-pool-workers` to test workflows work reliably in local development but fail inconsistently in CI environments (GitHub Actions, GitLab CI, etc.). + +**Root Cause:** +- Timing issues in CI environments +- Resource constraints in CI runners +- Race conditions in test setup/teardown + +**Prevention:** + +✅ **Increase timeouts in CI:** +```typescript +// vitest.config.ts +import { defineConfig } from 'vitest/config'; + +export default defineConfig({ + test: { + testTimeout: 30000, // Increase from default 5000ms + poolOptions: { + workers: { + wrangler: { configPath: './wrangler.jsonc' }, + }, + }, + }, +}); +``` + +✅ **Add retry logic for flaky tests:** +```typescript +describe('Workflow tests', () => { + it.retry(3)('should complete workflow', async () => { + // Test code + }); +}); +``` + +✅ **Use proper test isolation:** +```typescript +import { beforeEach, afterEach } from 'vitest'; + +let instance: WorkflowInstance; + +beforeEach(async () => { + instance = await env.MY_WORKFLOW.create({ + params: { userId: '123' } + }); +}); + +afterEach(async () => { + if (instance) { + try { + await instance.terminate(); + } catch (error) { + // Instance may already be terminated + } + } +}); +``` + +**Workaround:** +1. Increase test timeouts for CI +2. Add retry logic for flaky tests +3. Use proper test isolation +4. Consider mocking workflows in unit tests, testing real workflows in integration tests + +**Source:** [cloudflare/workers-sdk#10600](https://github.com/cloudflare/workers-sdk/issues/10600) +**Status:** Ongoing investigation + +--- + +## Additional Troubleshooting Tips + +### Workflow Instance Stuck in "Running" State + +**Possible Causes:** +1. Step is sleeping for long duration +2. Step is waiting for event that never arrives +3. Step is retrying with long backoff + +**Solution:** +```bash +# Check detailed instance status +npx wrangler workflows instances describe my-workflow + +# Look for: +# - Sleep state (shows wake time) +# - waitForEvent state (shows event type and timeout) +# - Retry history (shows attempts and delays) +``` + +--- + +### Step Returns Undefined + +**Cause:** Missing return statement in step callback + +**Solution:** +```typescript +// ❌ Bad - no return +const result = await step.do('get data', async () => { + const data = await fetchData(); + // Missing return! +}); +console.log(result); // undefined + +// ✅ Good - explicit return +const result = await step.do('get data', async () => { + const data = await fetchData(); + return data; // ✅ Return the value +}); +``` + +--- + +### Payload Too Large Error + +**Error:** +``` +Payload size exceeds limit +``` + +**Cause:** Workflow parameters or step outputs exceed 128 KB + +**Solution:** +```typescript +// ❌ Bad - large payload +await env.MY_WORKFLOW.create({ + params: { + largeData: hugeArray // >128 KB + } +}); + +// ✅ Good - store in R2/KV, pass reference +const key = `workflow-data/${crypto.randomUUID()}`; +await env.MY_BUCKET.put(key, JSON.stringify(hugeArray)); + +await env.MY_WORKFLOW.create({ + params: { + dataKey: key // Just pass the key + } +}); +``` + +--- + +## Getting Help + +If you encounter issues not listed here: + +1. **Check Cloudflare Status**: https://www.cloudflarestatus.com/ +2. **Search GitHub Issues**: https://github.com/cloudflare/workers-sdk/issues +3. **Cloudflare Discord**: https://discord.gg/cloudflaredev +4. **Cloudflare Community**: https://community.cloudflare.com/ +5. **Official Docs**: https://developers.cloudflare.com/workflows/ + +When reporting issues, include: +- Workflow code (sanitized) +- Wrangler configuration +- Error messages and stack traces +- Workflow instance ID +- Steps to reproduce +- Expected vs actual behavior + +--- + +**Last Updated**: 2025-10-22 +**Maintainer**: Jeremy Dawes | jeremy@jezweb.net diff --git a/references/workflow-patterns.md b/references/workflow-patterns.md new file mode 100644 index 0000000..0115298 --- /dev/null +++ b/references/workflow-patterns.md @@ -0,0 +1,585 @@ +# Cloudflare Workflows - Production Patterns + +**Last Updated**: 2025-10-22 + +This document provides battle-tested patterns for building production-ready Cloudflare Workflows. + +--- + +## Table of Contents + +1. [Idempotency Patterns](#idempotency-patterns) +2. [Error Handling Patterns](#error-handling-patterns) +3. [Long-Running Process Patterns](#long-running-process-patterns) +4. [Human-in-the-Loop Patterns](#human-in-the-loop-patterns) +5. [Workflow Chaining Patterns](#workflow-chaining-patterns) +6. [Testing Patterns](#testing-patterns) +7. [Monitoring Patterns](#monitoring-patterns) + +--- + +## Idempotency Patterns + +### Pattern 1: Idempotency Keys + +**Problem:** Workflow steps may execute multiple times due to retries. + +**Solution:** Use idempotency keys to ensure operations execute only once. + +```typescript +export class PaymentWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { orderId, amount } = event.payload; + + // Generate idempotency key from workflow instance ID + step name + const idempotencyKey = `${event.instanceId}-charge-payment`; + + const paymentResult = await step.do('charge payment', async () => { + // Check if already processed + const existing = await this.env.KV.get(`payment:${idempotencyKey}`); + if (existing) { + console.log('Payment already processed, returning cached result'); + return JSON.parse(existing); + } + + // Process payment + const response = await fetch('https://payment-gateway.example.com/charge', { + method: 'POST', + headers: { + 'Idempotency-Key': idempotencyKey // Payment gateway checks this + }, + body: JSON.stringify({ orderId, amount }) + }); + + const result = await response.json(); + + // Cache result + await this.env.KV.put( + `payment:${idempotencyKey}`, + JSON.stringify(result), + { expirationTtl: 86400 } // 24 hours + }); + + return result; + }); + + return { orderId, transactionId: paymentResult.transactionId }; + } +} +``` + +--- + +### Pattern 2: Database Upsert for Idempotency + +```typescript +await step.do('create order', async () => { + // Use INSERT OR REPLACE to make idempotent + await this.env.DB.prepare(` + INSERT INTO orders (id, user_id, amount, status, created_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + user_id = excluded.user_id, + amount = excluded.amount, + updated_at = CURRENT_TIMESTAMP + `).bind( + orderId, + userId, + amount, + 'pending', + new Date().toISOString() + ).run(); + + return { orderId }; +}); +``` + +--- + +## Error Handling Patterns + +### Pattern 1: Categorize Errors for Retry Logic + +```typescript +async function shouldRetry(error: Error): Promise { + // Don't retry on client errors (4xx) + if (error.message.includes('400') || + error.message.includes('401') || + error.message.includes('403') || + error.message.includes('404')) { + return false; + } + + // Retry on server errors (5xx) and network errors + return true; +} + +await step.do('call API', async () => { + try { + const response = await fetch('https://api.example.com/data'); + + if (!response.ok) { + const error = new Error(`API error: ${response.status}`); + + if (!await shouldRetry(error)) { + throw new NonRetryableError(error.message); + } + + throw error; // Will retry + } + + return await response.json(); + } catch (error) { + if (error instanceof NonRetryableError) { + throw error; + } + + // Network error - retry + throw error; + } +}); +``` + +--- + +### Pattern 2: Circuit Breaker + +```typescript +export class CircuitBreaker { + constructor( + private kv: KVNamespace, + private serviceName: string, + private threshold: number = 5, + private resetTime: number = 60000 // 1 minute + ) {} + + async call(fn: () => Promise): Promise { + const key = `circuit:${this.serviceName}`; + const state = await this.kv.get(key, 'json') as { + failures: number; + lastFailure: number; + } | null; + + // Check if circuit is open + if (state && state.failures >= this.threshold) { + const elapsed = Date.now() - state.lastFailure; + + if (elapsed < this.resetTime) { + throw new NonRetryableError( + `Circuit breaker open for ${this.serviceName}` + ); + } + } + + try { + const result = await fn(); + + // Reset on success + await this.kv.delete(key); + + return result; + } catch (error) { + // Increment failure count + const newState = { + failures: (state?.failures || 0) + 1, + lastFailure: Date.now() + }; + + await this.kv.put(key, JSON.stringify(newState), { + expirationTtl: this.resetTime / 1000 + }); + + throw error; + } + } +} + +// Usage +export class MyWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const circuitBreaker = new CircuitBreaker(this.env.KV, 'external-api'); + + await step.do('call external API', async () => { + return await circuitBreaker.call(async () => { + const response = await fetch('https://external-api.example.com/data'); + return await response.json(); + }); + }); + } +} +``` + +--- + +### Pattern 3: Graceful Degradation + +```typescript +await step.do('fetch user preferences', async () => { + try { + const response = await fetch(`https://api.example.com/users/${userId}/preferences`); + if (!response.ok) throw new Error('Failed to fetch preferences'); + return await response.json(); + } catch (error) { + console.warn('Failed to fetch preferences, using defaults:', error); + + // Fallback to defaults + return { + theme: 'light', + language: 'en', + notifications: true + }; + } +}); +``` + +--- + +## Long-Running Process Patterns + +### Pattern 1: Polling with Exponential Backoff + +```typescript +export class VideoProcessingWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { videoId } = event.payload; + + // Submit video for processing + const jobId = await step.do('submit video', async () => { + const response = await fetch('https://processor.example.com/jobs', { + method: 'POST', + body: JSON.stringify({ videoId }) + }); + const data = await response.json(); + return data.jobId; + }); + + // Poll for completion with exponential backoff + let complete = false; + let attempt = 0; + const maxAttempts = 20; + + while (!complete && attempt < maxAttempts) { + // Wait with exponential backoff: 10s, 20s, 40s, ... + const delay = Math.min(10 * Math.pow(2, attempt), 300); // Max 5 minutes + await step.sleep(`wait attempt ${attempt}`, `${delay} seconds`); + + const status = await step.do(`check status attempt ${attempt}`, async () => { + const response = await fetch( + `https://processor.example.com/jobs/${jobId}/status` + ); + return await response.json(); + }); + + if (status.state === 'complete') { + complete = true; + } else if (status.state === 'failed') { + throw new NonRetryableError(`Processing failed: ${status.error}`); + } + + attempt++; + } + + if (!complete) { + throw new Error('Processing timeout after maximum attempts'); + } + + return { videoId, jobId, status: 'complete' }; + } +} +``` + +--- + +### Pattern 2: Progress Tracking + +```typescript +export class DataMigrationWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { totalRecords, batchSize } = event.payload; + const batches = Math.ceil(totalRecords / batchSize); + + for (let i = 0; i < batches; i++) { + const progress = await step.do(`migrate batch ${i}`, async () => { + const offset = i * batchSize; + + // Migrate batch + await this.migrateBatch(offset, batchSize); + + // Update progress in DB + const percentage = Math.round(((i + 1) / batches) * 100); + await this.env.DB.prepare(` + UPDATE migration_jobs + SET progress = ?, updated_at = ? + WHERE id = ? + `).bind(percentage, new Date().toISOString(), event.payload.jobId).run(); + + return { batch: i + 1, total: batches, percentage }; + }); + + console.log(`Progress: ${progress.percentage}%`); + + // Small delay between batches to avoid overwhelming database + if (i < batches - 1) { + await step.sleep(`pause before batch ${i + 1}`, '1 second'); + } + } + + return { status: 'complete', batches }; + } + + private async migrateBatch(offset: number, limit: number) { + // Migration logic + } +} +``` + +--- + +## Human-in-the-Loop Patterns + +### Pattern 1: Approval with Timeout and Escalation + +```typescript +export class ApprovalWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { requestId, amount } = event.payload; + + // Send to primary approver + await step.do('notify primary approver', async () => { + await this.sendApprovalRequest('manager@example.com', requestId); + }); + + // Wait 48 hours for approval + let approved: boolean; + let approver: string; + + try { + const decision = await step.waitForEvent( + 'wait for primary approval', + { type: 'approval-decision', timeout: '48 hours' } + ); + + approved = decision.approved; + approver = decision.approverId; + } catch (error) { + // Timeout - escalate to senior manager + console.log('Primary approval timeout, escalating'); + + await step.do('notify senior approver', async () => { + await this.sendApprovalRequest('senior-manager@example.com', requestId); + }); + + // Wait another 24 hours + const escalatedDecision = await step.waitForEvent( + 'wait for escalated approval', + { type: 'approval-decision', timeout: '24 hours' } + ); + + approved = escalatedDecision.approved; + approver = escalatedDecision.approverId; + } + + if (approved) { + await step.do('execute approved action', async () => { + // Execute the action + }); + } + + return { requestId, approved, approver }; + } + + private async sendApprovalRequest(to: string, requestId: string) { + // Send notification + } +} +``` + +--- + +## Workflow Chaining Patterns + +### Pattern 1: Parent-Child Workflows + +```typescript +export class OrderWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { orderId } = event.payload; + + // Step 1: Process payment (separate workflow) + const paymentWorkflow = await step.do('start payment workflow', async () => { + const instance = await this.env.PAYMENT_WORKFLOW.create({ + params: { orderId, amount: event.payload.amount } + }); + return { instanceId: instance.id }; + }); + + // Step 2: Wait for payment to complete + let paymentComplete = false; + + while (!paymentComplete) { + await step.sleep('wait for payment', '30 seconds'); + + const paymentStatus = await step.do('check payment status', async () => { + const instance = await this.env.PAYMENT_WORKFLOW.get( + paymentWorkflow.instanceId + ); + return await instance.status(); + }); + + if (paymentStatus.status === 'complete') { + paymentComplete = true; + } else if (paymentStatus.status === 'errored') { + throw new Error(`Payment failed: ${paymentStatus.error}`); + } + } + + // Step 3: Start fulfillment workflow + await step.do('start fulfillment workflow', async () => { + await this.env.FULFILLMENT_WORKFLOW.create({ + params: { orderId } + }); + }); + + return { orderId, status: 'processing' }; + } +} +``` + +--- + +## Testing Patterns + +### Pattern 1: Mock External APIs + +```typescript +import { describe, it, expect, beforeEach } from 'vitest'; +import { unstable_dev } from 'wrangler'; + +describe('PaymentWorkflow', () => { + let worker; + + beforeEach(async () => { + worker = await unstable_dev('src/index.ts', { + experimental: { disableExperimentalWarning: true } + }); + }); + + it('should process payment successfully', async () => { + // Mock fetch to return success + globalThis.fetch = async (url: string) => { + if (url.includes('payment-gateway')) { + return new Response(JSON.stringify({ + transactionId: 'TXN-123', + status: 'success' + })); + } + return new Response('Not found', { status: 404 }); + }; + + const response = await worker.fetch('/workflows/create', { + method: 'POST', + body: JSON.stringify({ + orderId: 'ORD-123', + amount: 99.99 + }) + }); + + const data = await response.json(); + expect(data.id).toBeDefined(); + }); +}); +``` + +--- + +## Monitoring Patterns + +### Pattern 1: Structured Logging + +```typescript +export class MyWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + this.log('info', 'Workflow started', { + instanceId: event.instanceId, + params: event.payload + }); + + try { + await step.do('process data', async () => { + this.log('info', 'Processing data', { userId: event.payload.userId }); + // Process + return { processed: true }; + }); + + this.log('info', 'Workflow completed successfully', { + instanceId: event.instanceId + }); + } catch (error) { + this.log('error', 'Workflow failed', { + instanceId: event.instanceId, + error: error instanceof Error ? error.message : 'Unknown error' + }); + throw error; + } + } + + private log(level: string, message: string, data: any) { + console.log(JSON.stringify({ + level, + message, + timestamp: new Date().toISOString(), + ...data + })); + } +} +``` + +--- + +### Pattern 2: Metrics Tracking + +```typescript +await step.do('track metrics', async () => { + const metrics = { + workflowId: event.instanceId, + stepName: 'payment-processing', + duration: performance.now() - startTime, + status: 'success', + timestamp: new Date().toISOString() + }; + + // Store in Analytics Engine + await this.env.ANALYTICS.writeDataPoint(metrics); + + return metrics; +}); +``` + +--- + +## Best Practices Summary + +### Always Do + +1. **Use idempotency keys** for external API calls +2. **Categorize errors** - retry on transient failures, fail fast on terminal errors +3. **Log structured data** - JSON logs for easy querying +4. **Track progress** - update database for long-running processes +5. **Use exponential backoff** - for polling and retries +6. **Test workflows** - unit tests with mocked dependencies +7. **Monitor metrics** - track success rates, durations, errors + +### Never Do + +1. **Don't retry non-idempotent operations infinitely** - use retry limits +2. **Don't ignore timeout errors** - handle gracefully with fallbacks +3. **Don't block on external events without timeout** - always set timeout +4. **Don't assume steps execute in order** - each step is independent +5. **Don't return non-serializable values** - only JSON-compatible types +6. **Don't store sensitive data in workflow state** - use KV/D1 instead +7. **Don't forget to clean up resources** - terminate unused workflow instances + +--- + +**Last Updated**: 2025-10-22 +**Maintainer**: Jeremy Dawes | jeremy@jezweb.net diff --git a/templates/basic-workflow.ts b/templates/basic-workflow.ts new file mode 100644 index 0000000..2619218 --- /dev/null +++ b/templates/basic-workflow.ts @@ -0,0 +1,136 @@ +/** + * Basic Cloudflare Workflow Example + * + * Demonstrates: + * - WorkflowEntrypoint class + * - step.do() for executing work + * - step.sleep() for delays + * - Accessing environment bindings + * - Returning state from workflow + */ + +import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'; + +// Define environment bindings +type Env = { + MY_WORKFLOW: Workflow; + // Add your bindings here: + // MY_KV: KVNamespace; + // DB: D1Database; + // MY_BUCKET: R2Bucket; +}; + +// Define workflow parameters +type Params = { + userId: string; + email: string; +}; + +/** + * Basic Workflow + * + * Three-step workflow that: + * 1. Fetches user data + * 2. Processes user data + * 3. Sends notification + */ +export class BasicWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + // Access parameters from event.payload + const { userId, email } = event.payload; + + console.log(`Starting workflow for user ${userId}`); + + // Step 1: Fetch user data + const userData = await step.do('fetch user data', async () => { + // Example: Fetch from external API + const response = await fetch(`https://api.example.com/users/${userId}`); + const data = await response.json(); + + return { + id: data.id, + name: data.name, + email: data.email, + preferences: data.preferences + }; + }); + + console.log(`Fetched user: ${userData.name}`); + + // Step 2: Process user data + const processedData = await step.do('process user data', async () => { + // Example: Perform some computation + return { + userId: userData.id, + processedAt: new Date().toISOString(), + status: 'processed' + }; + }); + + // Step 3: Wait before sending notification + await step.sleep('wait before notification', '5 minutes'); + + // Step 4: Send notification + await step.do('send notification', async () => { + // Example: Send email or push notification + await fetch('https://api.example.com/notifications', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + to: email, + subject: 'Processing Complete', + body: `Your data has been processed at ${processedData.processedAt}` + }) + }); + + return { sent: true, timestamp: Date.now() }; + }); + + // Return final state (must be serializable) + return { + userId, + status: 'complete', + processedAt: processedData.processedAt + }; + } +} + +/** + * Worker that triggers the workflow + */ +export default { + async fetch(req: Request, env: Env): Promise { + const url = new URL(req.url); + + // Handle favicon + if (url.pathname.startsWith('/favicon')) { + return Response.json({}, { status: 404 }); + } + + // Get instance status if ID provided + const instanceId = url.searchParams.get('instanceId'); + if (instanceId) { + const instance = await env.MY_WORKFLOW.get(instanceId); + const status = await instance.status(); + + return Response.json({ + id: instanceId, + status + }); + } + + // Create new workflow instance + const instance = await env.MY_WORKFLOW.create({ + params: { + userId: '123', + email: 'user@example.com' + } + }); + + return Response.json({ + id: instance.id, + details: await instance.status(), + statusUrl: `${url.origin}?instanceId=${instance.id}` + }); + } +}; diff --git a/templates/scheduled-workflow.ts b/templates/scheduled-workflow.ts new file mode 100644 index 0000000..1de03fe --- /dev/null +++ b/templates/scheduled-workflow.ts @@ -0,0 +1,252 @@ +/** + * Scheduled Workflow Example + * + * Demonstrates: + * - step.sleep() for relative delays + * - step.sleepUntil() for absolute times + * - Scheduling daily/weekly tasks + * - Long-running processes + */ + +import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'; + +type Env = { + MY_WORKFLOW: Workflow; + DB: D1Database; +}; + +type ReportParams = { + reportType: 'daily' | 'weekly' | 'monthly'; + recipients: string[]; +}; + +/** + * Scheduled Reporting Workflow + * + * Generates and sends reports on a schedule: + * - Daily reports at 9am UTC + * - Weekly reports on Monday 9am UTC + * - Monthly reports on 1st of month 9am UTC + */ +export class ScheduledReportWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { reportType, recipients } = event.payload; + + if (reportType === 'daily') { + await this.runDailyReport(step, recipients); + } else if (reportType === 'weekly') { + await this.runWeeklyReport(step, recipients); + } else if (reportType === 'monthly') { + await this.runMonthlyReport(step, recipients); + } + + return { reportType, status: 'complete' }; + } + + /** + * Daily report - runs every day at 9am UTC + */ + private async runDailyReport(step: WorkflowStep, recipients: string[]) { + // Calculate next 9am UTC + const now = new Date(); + const next9am = new Date(); + next9am.setUTCDate(next9am.getUTCDate() + 1); + next9am.setUTCHours(9, 0, 0, 0); + + // Sleep until tomorrow 9am + await step.sleepUntil('wait until 9am tomorrow', next9am); + + // Generate report + const report = await step.do('generate daily report', async () => { + const yesterday = new Date(now); + yesterday.setDate(yesterday.getDate() - 1); + const dateStr = yesterday.toISOString().split('T')[0]; + + const results = await this.env.DB.prepare( + 'SELECT * FROM daily_metrics WHERE date = ?' + ).bind(dateStr).all(); + + return { + date: dateStr, + type: 'daily', + metrics: results.results + }; + }); + + // Send report + await step.do('send daily report', async () => { + await this.sendReport(report, recipients); + return { sent: true }; + }); + } + + /** + * Weekly report - runs every Monday at 9am UTC + */ + private async runWeeklyReport(step: WorkflowStep, recipients: string[]) { + // Calculate next Monday 9am UTC + const nextMonday = new Date(); + const daysUntilMonday = (1 + 7 - nextMonday.getDay()) % 7 || 7; + nextMonday.setDate(nextMonday.getDate() + daysUntilMonday); + nextMonday.setUTCHours(9, 0, 0, 0); + + await step.sleepUntil('wait until Monday 9am', nextMonday); + + // Generate report + const report = await step.do('generate weekly report', async () => { + const lastWeek = new Date(); + lastWeek.setDate(lastWeek.getDate() - 7); + + const results = await this.env.DB.prepare( + 'SELECT * FROM daily_metrics WHERE date >= ? ORDER BY date DESC' + ).bind(lastWeek.toISOString().split('T')[0]).all(); + + return { + weekStart: lastWeek.toISOString().split('T')[0], + type: 'weekly', + metrics: results.results + }; + }); + + // Send report + await step.do('send weekly report', async () => { + await this.sendReport(report, recipients); + return { sent: true }; + }); + } + + /** + * Monthly report - runs on 1st of each month at 9am UTC + */ + private async runMonthlyReport(step: WorkflowStep, recipients: string[]) { + // Calculate first day of next month at 9am UTC + const firstOfNextMonth = new Date(); + firstOfNextMonth.setUTCMonth(firstOfNextMonth.getUTCMonth() + 1, 1); + firstOfNextMonth.setUTCHours(9, 0, 0, 0); + + await step.sleepUntil('wait until 1st of month 9am', firstOfNextMonth); + + // Generate report + const report = await step.do('generate monthly report', async () => { + const lastMonth = new Date(); + lastMonth.setMonth(lastMonth.getMonth() - 1); + const monthStart = new Date(lastMonth.getFullYear(), lastMonth.getMonth(), 1); + const monthEnd = new Date(lastMonth.getFullYear(), lastMonth.getMonth() + 1, 0); + + const results = await this.env.DB.prepare( + 'SELECT * FROM daily_metrics WHERE date >= ? AND date <= ? ORDER BY date DESC' + ).bind( + monthStart.toISOString().split('T')[0], + monthEnd.toISOString().split('T')[0] + ).all(); + + return { + month: lastMonth.toISOString().substring(0, 7), // YYYY-MM + type: 'monthly', + metrics: results.results + }; + }); + + // Send report + await step.do('send monthly report', async () => { + await this.sendReport(report, recipients); + return { sent: true }; + }); + } + + /** + * Send report via email + */ + private async sendReport(report: any, recipients: string[]) { + const subject = `${report.type.charAt(0).toUpperCase() + report.type.slice(1)} Report`; + + await fetch('https://api.example.com/send-email', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + to: recipients, + subject, + body: this.formatReport(report) + }) + }); + } + + /** + * Format report data as HTML + */ + private formatReport(report: any): string { + // Format report metrics as HTML + return ` +

${report.type} Report

+

Period: ${report.date || report.weekStart || report.month}

+
${JSON.stringify(report.metrics, null, 2)}
+ `; + } +} + +/** + * Example: Reminder Workflow with Multiple Delays + */ +export class ReminderWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent<{ userId: string; message: string }>, step: WorkflowStep) { + const { userId, message } = event.payload; + + // Send initial reminder + await step.do('send initial reminder', async () => { + await this.sendReminder(userId, message); + return { sent: true }; + }); + + // Wait 1 hour + await step.sleep('wait 1 hour', '1 hour'); + + // Send second reminder + await step.do('send second reminder', async () => { + await this.sendReminder(userId, `Reminder: ${message}`); + return { sent: true }; + }); + + // Wait 1 day + await step.sleep('wait 1 day', '1 day'); + + // Send final reminder + await step.do('send final reminder', async () => { + await this.sendReminder(userId, `Final reminder: ${message}`); + return { sent: true }; + }); + + return { userId, remindersSent: 3 }; + } + + private async sendReminder(userId: string, message: string) { + await fetch(`https://api.example.com/send-notification`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ userId, message }) + }); + } +} + +export default { + async fetch(req: Request, env: Env): Promise { + const url = new URL(req.url); + + if (url.pathname.startsWith('/favicon')) { + return Response.json({}, { status: 404 }); + } + + // Create daily report workflow + const instance = await env.MY_WORKFLOW.create({ + params: { + reportType: 'daily', + recipients: ['admin@example.com', 'team@example.com'] + } + }); + + return Response.json({ + id: instance.id, + status: await instance.status(), + message: 'Daily report workflow scheduled' + }); + } +}; diff --git a/templates/worker-trigger.ts b/templates/worker-trigger.ts new file mode 100644 index 0000000..e675e66 --- /dev/null +++ b/templates/worker-trigger.ts @@ -0,0 +1,287 @@ +/** + * Worker that Triggers and Manages Workflows + * + * Demonstrates: + * - Creating workflow instances + * - Querying workflow status + * - Sending events to workflows + * - Pausing/resuming workflows + * - Terminating workflows + */ + +import { Hono } from 'hono'; + +type Bindings = { + MY_WORKFLOW: Workflow; + DB: D1Database; +}; + +const app = new Hono<{ Bindings: Bindings }>(); + +/** + * Create new workflow instance + */ +app.post('/workflows/create', async (c) => { + const body = await c.req.json<{ + userId: string; + email: string; + [key: string]: any; + }>(); + + try { + // Create workflow instance with parameters + const instance = await c.env.MY_WORKFLOW.create({ + params: body + }); + + // Optionally store instance ID for later reference + await c.env.DB.prepare(` + INSERT INTO workflow_instances (id, user_id, status, created_at) + VALUES (?, ?, ?, ?) + `).bind( + instance.id, + body.userId, + 'queued', + new Date().toISOString() + ).run(); + + return c.json({ + id: instance.id, + status: await instance.status(), + createdAt: new Date().toISOString() + }, 201); + } catch (error) { + return c.json({ + error: 'Failed to create workflow', + message: error instanceof Error ? error.message : 'Unknown error' + }, 500); + } +}); + +/** + * Get workflow instance status + */ +app.get('/workflows/:id', async (c) => { + const instanceId = c.req.param('id'); + + try { + const instance = await c.env.MY_WORKFLOW.get(instanceId); + const status = await instance.status(); + + return c.json({ + id: instanceId, + ...status + }); + } catch (error) { + return c.json({ + error: 'Workflow not found', + message: error instanceof Error ? error.message : 'Unknown error' + }, 404); + } +}); + +/** + * Send event to waiting workflow + */ +app.post('/workflows/:id/events', async (c) => { + const instanceId = c.req.param('id'); + const body = await c.req.json<{ + type: string; + payload: any; + }>(); + + try { + const instance = await c.env.MY_WORKFLOW.get(instanceId); + + await instance.sendEvent({ + type: body.type, + payload: body.payload + }); + + return c.json({ + success: true, + message: 'Event sent to workflow' + }); + } catch (error) { + return c.json({ + error: 'Failed to send event', + message: error instanceof Error ? error.message : 'Unknown error' + }, 500); + } +}); + +/** + * Pause workflow instance + */ +app.post('/workflows/:id/pause', async (c) => { + const instanceId = c.req.param('id'); + + try { + const instance = await c.env.MY_WORKFLOW.get(instanceId); + await instance.pause(); + + // Update database + await c.env.DB.prepare(` + UPDATE workflow_instances SET status = ? WHERE id = ? + `).bind('paused', instanceId).run(); + + return c.json({ + success: true, + message: 'Workflow paused' + }); + } catch (error) { + return c.json({ + error: 'Failed to pause workflow', + message: error instanceof Error ? error.message : 'Unknown error' + }, 500); + } +}); + +/** + * Resume paused workflow instance + */ +app.post('/workflows/:id/resume', async (c) => { + const instanceId = c.req.param('id'); + + try { + const instance = await c.env.MY_WORKFLOW.get(instanceId); + await instance.resume(); + + // Update database + await c.env.DB.prepare(` + UPDATE workflow_instances SET status = ? WHERE id = ? + `).bind('running', instanceId).run(); + + return c.json({ + success: true, + message: 'Workflow resumed' + }); + } catch (error) { + return c.json({ + error: 'Failed to resume workflow', + message: error instanceof Error ? error.message : 'Unknown error' + }, 500); + } +}); + +/** + * Terminate workflow instance + */ +app.post('/workflows/:id/terminate', async (c) => { + const instanceId = c.req.param('id'); + + try { + const instance = await c.env.MY_WORKFLOW.get(instanceId); + await instance.terminate(); + + // Update database + await c.env.DB.prepare(` + UPDATE workflow_instances SET status = ? WHERE id = ? + `).bind('terminated', instanceId).run(); + + return c.json({ + success: true, + message: 'Workflow terminated' + }); + } catch (error) { + return c.json({ + error: 'Failed to terminate workflow', + message: error instanceof Error ? error.message : 'Unknown error' + }, 500); + } +}); + +/** + * List all workflow instances (with filtering) + */ +app.get('/workflows', async (c) => { + const status = c.req.query('status'); + const userId = c.req.query('userId'); + const limit = parseInt(c.req.query('limit') || '20'); + const offset = parseInt(c.req.query('offset') || '0'); + + let query = 'SELECT * FROM workflow_instances WHERE 1=1'; + const params: any[] = []; + + if (status) { + query += ' AND status = ?'; + params.push(status); + } + + if (userId) { + query += ' AND user_id = ?'; + params.push(userId); + } + + query += ' ORDER BY created_at DESC LIMIT ? OFFSET ?'; + params.push(limit, offset); + + try { + const results = await c.env.DB.prepare(query).bind(...params).all(); + + return c.json({ + workflows: results.results, + limit, + offset, + total: results.results.length + }); + } catch (error) { + return c.json({ + error: 'Failed to list workflows', + message: error instanceof Error ? error.message : 'Unknown error' + }, 500); + } +}); + +/** + * Health check + */ +app.get('/health', (c) => { + return c.json({ + status: 'ok', + timestamp: new Date().toISOString() + }); +}); + +/** + * API documentation + */ +app.get('/', (c) => { + return c.json({ + name: 'Workflow Management API', + version: '1.0.0', + endpoints: { + 'POST /workflows/create': { + description: 'Create new workflow instance', + body: { userId: 'string', email: 'string', ...params: 'any' } + }, + 'GET /workflows/:id': { + description: 'Get workflow status', + params: { id: 'workflow instance ID' } + }, + 'POST /workflows/:id/events': { + description: 'Send event to workflow', + params: { id: 'workflow instance ID' }, + body: { type: 'string', payload: 'any' } + }, + 'POST /workflows/:id/pause': { + description: 'Pause workflow', + params: { id: 'workflow instance ID' } + }, + 'POST /workflows/:id/resume': { + description: 'Resume paused workflow', + params: { id: 'workflow instance ID' } + }, + 'POST /workflows/:id/terminate': { + description: 'Terminate workflow', + params: { id: 'workflow instance ID' } + }, + 'GET /workflows': { + description: 'List workflows', + query: { status: 'string (optional)', userId: 'string (optional)', limit: 'number', offset: 'number' } + } + } + }); +}); + +export default app; diff --git a/templates/workflow-with-events.ts b/templates/workflow-with-events.ts new file mode 100644 index 0000000..0ec96dd --- /dev/null +++ b/templates/workflow-with-events.ts @@ -0,0 +1,335 @@ +/** + * Event-Driven Workflow Example + * + * Demonstrates: + * - step.waitForEvent() for external events + * - instance.sendEvent() to trigger waiting workflows + * - Timeout handling + * - Human-in-the-loop patterns + */ + +import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'; + +type Env = { + APPROVAL_WORKFLOW: Workflow; + DB: D1Database; +}; + +type ApprovalParams = { + requestId: string; + requesterId: string; + amount: number; + description: string; +}; + +type ApprovalEvent = { + approved: boolean; + approverId: string; + comments?: string; +}; + +/** + * Approval Workflow with Event Waiting + * + * Flow: + * 1. Create approval request + * 2. Notify approvers + * 3. Wait for approval decision (max 7 days) + * 4. Process decision + * 5. Execute approved action or reject + */ +export class ApprovalWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { requestId, requesterId, amount, description } = event.payload; + + // Step 1: Create approval request in database + await step.do('create approval request', async () => { + await this.env.DB.prepare(` + INSERT INTO approval_requests + (id, requester_id, amount, description, status, created_at) + VALUES (?, ?, ?, ?, ?, ?) + `).bind( + requestId, + requesterId, + amount, + description, + 'pending', + new Date().toISOString() + ).run(); + + return { created: true }; + }); + + // Step 2: Send notification to approvers + await step.do('notify approvers', async () => { + // Get list of approvers based on amount + const approvers = amount > 10000 + ? ['senior-manager@example.com', 'finance@example.com'] + : ['manager@example.com']; + + // Send notification to each approver + await fetch('https://api.example.com/send-notifications', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + recipients: approvers, + subject: `Approval Required: ${description}`, + body: ` + Request ID: ${requestId} + Amount: $${amount} + Description: ${description} + Requester: ${requesterId} + + Please review and approve/reject at: + https://app.example.com/approvals/${requestId} + `, + data: { + requestId, + workflowInstanceId: event.instanceId // Store for sending event later + } + }) + }); + + return { notified: true, approvers }; + }); + + // Step 3: Wait for approval decision (max 7 days) + let approvalEvent: ApprovalEvent; + + try { + approvalEvent = await step.waitForEvent( + 'wait for approval decision', + { + type: 'approval-decision', + timeout: '7 days' // Auto-reject after 7 days + } + ); + + console.log('Approval decision received:', approvalEvent); + } catch (error) { + // Timeout occurred - auto-reject + console.log('Approval timeout - auto-rejecting'); + + await step.do('auto-reject due to timeout', async () => { + await this.env.DB.prepare(` + UPDATE approval_requests + SET status = ?, updated_at = ?, rejection_reason = ? + WHERE id = ? + `).bind( + 'rejected', + new Date().toISOString(), + 'Approval timeout - no response within 7 days', + requestId + ).run(); + + // Notify requester + await this.notifyRequester(requesterId, requestId, false, 'Approval timeout'); + + return { rejected: true, reason: 'timeout' }; + }); + + return { + requestId, + status: 'rejected', + reason: 'timeout' + }; + } + + // Step 4: Process approval decision + await step.do('process approval decision', async () => { + await this.env.DB.prepare(` + UPDATE approval_requests + SET status = ?, approver_id = ?, comments = ?, updated_at = ? + WHERE id = ? + `).bind( + approvalEvent.approved ? 'approved' : 'rejected', + approvalEvent.approverId, + approvalEvent.comments || null, + new Date().toISOString(), + requestId + ).run(); + + return { processed: true }; + }); + + // Step 5: Notify requester + await step.do('notify requester', async () => { + await this.notifyRequester( + requesterId, + requestId, + approvalEvent.approved, + approvalEvent.comments + ); + + return { notified: true }; + }); + + // Step 6: Execute approved action if approved + if (approvalEvent.approved) { + await step.do('execute approved action', async () => { + // Execute the action that was approved + console.log(`Executing approved action for request ${requestId}`); + + // Example: Process payment, create resource, etc. + await fetch('https://api.example.com/execute-action', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + requestId, + amount, + description + }) + }); + + return { executed: true }; + }); + } + + return { + requestId, + status: approvalEvent.approved ? 'approved' : 'rejected', + approver: approvalEvent.approverId + }; + } + + /** + * Send notification to requester + */ + private async notifyRequester( + requesterId: string, + requestId: string, + approved: boolean, + comments?: string + ) { + await fetch('https://api.example.com/send-notification', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + recipient: requesterId, + subject: `Request ${requestId} ${approved ? 'Approved' : 'Rejected'}`, + body: ` + Your request ${requestId} has been ${approved ? 'approved' : 'rejected'}. + ${comments ? `\n\nComments: ${comments}` : ''} + ` + }) + }); + } +} + +/** + * Worker that handles: + * 1. Creating approval workflows + * 2. Receiving approval decisions via webhook + * 3. Sending events to waiting workflows + */ +export default { + async fetch(req: Request, env: Env): Promise { + const url = new URL(req.url); + + if (url.pathname.startsWith('/favicon')) { + return Response.json({}, { status: 404 }); + } + + // Endpoint: Create new approval request + if (url.pathname === '/approvals/create' && req.method === 'POST') { + const body = await req.json(); + + // Create workflow instance + const instance = await env.APPROVAL_WORKFLOW.create({ + params: body + }); + + // Store instance ID for later (when approval decision comes in) + // In production, store this in DB/KV + await env.DB.prepare(` + UPDATE approval_requests + SET workflow_instance_id = ? + WHERE id = ? + `).bind(instance.id, body.requestId).run(); + + return Response.json({ + id: instance.id, + requestId: body.requestId, + status: await instance.status() + }); + } + + // Endpoint: Submit approval decision (webhook from approval UI) + if (url.pathname === '/approvals/decide' && req.method === 'POST') { + const body = await req.json<{ + requestId: string; + approved: boolean; + approverId: string; + comments?: string; + }>(); + + // Get workflow instance ID from database + const result = await env.DB.prepare(` + SELECT workflow_instance_id + FROM approval_requests + WHERE id = ? + `).bind(body.requestId).first<{ workflow_instance_id: string }>(); + + if (!result) { + return Response.json( + { error: 'Request not found' }, + { status: 404 } + ); + } + + // Get workflow instance + const instance = await env.APPROVAL_WORKFLOW.get(result.workflow_instance_id); + + // Send event to waiting workflow + await instance.sendEvent({ + type: 'approval-decision', + payload: { + approved: body.approved, + approverId: body.approverId, + comments: body.comments + } + }); + + return Response.json({ + success: true, + message: 'Approval decision sent to workflow' + }); + } + + // Endpoint: Get approval status + if (url.pathname.startsWith('/approvals/') && req.method === 'GET') { + const requestId = url.pathname.split('/')[2]; + + const result = await env.DB.prepare(` + SELECT workflow_instance_id, status + FROM approval_requests + WHERE id = ? + `).bind(requestId).first<{ workflow_instance_id: string; status: string }>(); + + if (!result) { + return Response.json( + { error: 'Request not found' }, + { status: 404 } + ); + } + + const instance = await env.APPROVAL_WORKFLOW.get(result.workflow_instance_id); + const workflowStatus = await instance.status(); + + return Response.json({ + requestId, + dbStatus: result.status, + workflowStatus + }); + } + + // Default: Show usage + return Response.json({ + endpoints: { + 'POST /approvals/create': 'Create approval request', + 'POST /approvals/decide': 'Submit approval decision', + 'GET /approvals/:id': 'Get approval status' + } + }); + } +}; diff --git a/templates/workflow-with-retries.ts b/templates/workflow-with-retries.ts new file mode 100644 index 0000000..8697817 --- /dev/null +++ b/templates/workflow-with-retries.ts @@ -0,0 +1,235 @@ +/** + * Workflow with Advanced Retry Configuration + * + * Demonstrates: + * - Custom retry limits + * - Exponential, linear, and constant backoff + * - Step timeouts + * - NonRetryableError for terminal failures + * - Error handling with try-catch + */ + +import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'; +import { NonRetryableError } from 'cloudflare:workflows'; + +type Env = { + MY_WORKFLOW: Workflow; +}; + +type PaymentParams = { + orderId: string; + amount: number; + customerId: string; +}; + +/** + * Payment Processing Workflow with Retries + * + * Handles payment processing with: + * - Validation with NonRetryableError + * - Retry logic for payment gateway + * - Fallback to backup gateway + * - Graceful error handling + */ +export class PaymentWorkflow extends WorkflowEntrypoint { + async run(event: WorkflowEvent, step: WorkflowStep) { + const { orderId, amount, customerId } = event.payload; + + // Step 1: Validate input (no retries - fail fast) + await step.do( + 'validate payment request', + { + retries: { + limit: 0 // No retries for validation + } + }, + async () => { + if (!orderId || !customerId) { + throw new NonRetryableError('Missing required fields: orderId or customerId'); + } + + if (amount <= 0) { + throw new NonRetryableError(`Invalid amount: ${amount}`); + } + + if (amount > 100000) { + throw new NonRetryableError(`Amount exceeds limit: ${amount}`); + } + + return { valid: true }; + } + ); + + // Step 2: Call primary payment gateway (exponential backoff) + let paymentResult; + + try { + paymentResult = await step.do( + 'charge primary payment gateway', + { + retries: { + limit: 5, // Max 5 retry attempts + delay: '10 seconds', // Start at 10 seconds + backoff: 'exponential' // 10s, 20s, 40s, 80s, 160s + }, + timeout: '2 minutes' // Each attempt times out after 2 minutes + }, + async () => { + const response = await fetch('https://primary-payment-gateway.example.com/charge', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + orderId, + amount, + customerId + }) + }); + + if (!response.ok) { + // Check if error is retryable + if (response.status === 401 || response.status === 403) { + throw new NonRetryableError('Authentication failed with payment gateway'); + } + + throw new Error(`Payment gateway error: ${response.status}`); + } + + const data = await response.json(); + return { + transactionId: data.transactionId, + status: data.status, + gateway: 'primary' + }; + } + ); + } catch (error) { + console.error('Primary gateway failed:', error); + + // Step 3: Fallback to backup gateway (linear backoff) + paymentResult = await step.do( + 'charge backup payment gateway', + { + retries: { + limit: 3, + delay: '30 seconds', + backoff: 'linear' // 30s, 60s, 90s + }, + timeout: '3 minutes' + }, + async () => { + const response = await fetch('https://backup-payment-gateway.example.com/charge', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + orderId, + amount, + customerId + }) + }); + + if (!response.ok) { + throw new Error(`Backup gateway error: ${response.status}`); + } + + const data = await response.json(); + return { + transactionId: data.transactionId, + status: data.status, + gateway: 'backup' + }; + } + ); + } + + // Step 4: Update order status (constant backoff) + await step.do( + 'update order status', + { + retries: { + limit: 10, + delay: '5 seconds', + backoff: 'constant' // Always 5 seconds between retries + }, + timeout: '30 seconds' + }, + async () => { + const response = await fetch(`https://api.example.com/orders/${orderId}`, { + method: 'PATCH', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + status: 'paid', + transactionId: paymentResult.transactionId, + gateway: paymentResult.gateway + }) + }); + + if (!response.ok) { + throw new Error('Failed to update order status'); + } + + return { updated: true }; + } + ); + + // Step 5: Send confirmation (optional - don't fail workflow if this fails) + try { + await step.do( + 'send payment confirmation', + { + retries: { + limit: 3, + delay: '10 seconds', + backoff: 'exponential' + } + }, + async () => { + await fetch('https://api.example.com/notifications/payment-confirmed', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + orderId, + customerId, + amount + }) + }); + + return { sent: true }; + } + ); + } catch (error) { + // Log but don't fail workflow + console.error('Failed to send confirmation:', error); + } + + return { + orderId, + transactionId: paymentResult.transactionId, + gateway: paymentResult.gateway, + status: 'complete' + }; + } +} + +export default { + async fetch(req: Request, env: Env): Promise { + const url = new URL(req.url); + + if (url.pathname.startsWith('/favicon')) { + return Response.json({}, { status: 404 }); + } + + // Create payment workflow + const instance = await env.MY_WORKFLOW.create({ + params: { + orderId: 'ORD-' + Date.now(), + amount: 99.99, + customerId: 'CUST-123' + } + }); + + return Response.json({ + id: instance.id, + status: await instance.status() + }); + } +}; diff --git a/templates/wrangler-workflows-config.jsonc b/templates/wrangler-workflows-config.jsonc new file mode 100644 index 0000000..3c02f43 --- /dev/null +++ b/templates/wrangler-workflows-config.jsonc @@ -0,0 +1,171 @@ +/** + * Complete Wrangler Configuration for Workflows + * + * This file shows all configuration options for Cloudflare Workflows. + * Copy and adapt to your project's needs. + */ +{ + "$schema": "node_modules/wrangler/config-schema.json", + "name": "my-workflow-project", + "main": "src/index.ts", + "account_id": "YOUR_ACCOUNT_ID", + "compatibility_date": "2025-10-22", + + /** + * Workflows Configuration + * + * Define workflows that can be triggered from Workers. + * Each workflow binding makes a workflow class available via env.BINDING_NAME + */ + "workflows": [ + { + /** + * name: The name of the workflow (used in dashboard, CLI commands) + * This is the workflow identifier for wrangler commands like: + * - npx wrangler workflows instances list my-workflow + * - npx wrangler workflows instances describe my-workflow + */ + "name": "my-workflow", + + /** + * binding: The name used in your code to access this workflow + * Available in Workers as: env.MY_WORKFLOW + */ + "binding": "MY_WORKFLOW", + + /** + * class_name: The exported class name that extends WorkflowEntrypoint + * Must match your TypeScript class: + * export class MyWorkflow extends WorkflowEntrypoint { ... } + */ + "class_name": "MyWorkflow" + }, + + /** + * Example: Multiple workflows in one project + */ + { + "name": "payment-workflow", + "binding": "PAYMENT_WORKFLOW", + "class_name": "PaymentWorkflow" + }, + { + "name": "approval-workflow", + "binding": "APPROVAL_WORKFLOW", + "class_name": "ApprovalWorkflow" + } + + /** + * Example: Workflow in different Worker script + * + * If your workflow is defined in a separate Worker script, + * use script_name to reference it + */ + // { + // "name": "external-workflow", + // "binding": "EXTERNAL_WORKFLOW", + // "class_name": "ExternalWorkflow", + // "script_name": "workflow-worker" // Name of Worker that contains the workflow + // } + ], + + /** + * Optional: Other Cloudflare bindings + */ + + // KV Namespace + // "kv_namespaces": [ + // { + // "binding": "MY_KV", + // "id": "YOUR_KV_ID" + // } + // ], + + // D1 Database + // "d1_databases": [ + // { + // "binding": "DB", + // "database_name": "my-database", + // "database_id": "YOUR_DB_ID" + // } + // ], + + // R2 Bucket + // "r2_buckets": [ + // { + // "binding": "MY_BUCKET", + // "bucket_name": "my-bucket" + // } + // ], + + // Queues + // "queues": { + // "producers": [ + // { + // "binding": "MY_QUEUE", + // "queue": "my-queue" + // } + // ] + // }, + + /** + * Optional: Environment variables + */ + // "vars": { + // "ENVIRONMENT": "production", + // "API_URL": "https://api.example.com" + // }, + + /** + * Optional: Observability + */ + "observability": { + "enabled": true + }, + + /** + * Optional: Multiple environments + */ + "env": { + "staging": { + "vars": { + "ENVIRONMENT": "staging" + }, + "workflows": [ + { + "name": "my-workflow-staging", + "binding": "MY_WORKFLOW", + "class_name": "MyWorkflow" + } + ] + }, + "production": { + "vars": { + "ENVIRONMENT": "production" + }, + "workflows": [ + { + "name": "my-workflow-production", + "binding": "MY_WORKFLOW", + "class_name": "MyWorkflow" + } + ] + } + } +} + +/** + * Deployment Commands: + * + * # Deploy to default (production) + * npx wrangler deploy + * + * # Deploy to staging environment + * npx wrangler deploy --env staging + * + * # List workflow instances + * npx wrangler workflows instances list my-workflow + * + * # Get instance status + * npx wrangler workflows instances describe my-workflow + */