Initial commit
This commit is contained in:
12
.claude-plugin/plugin.json
Normal file
12
.claude-plugin/plugin.json
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"name": "cloudflare-workflows",
|
||||
"description": "Build durable, long-running workflows on Cloudflare Workers with automatic retries, state persistence, and multi-step orchestration. Supports step.do, step.sleep, step.waitForEvent, and runs for hours to days. Use when: creating long-running workflows, implementing retry logic, building event-driven processes, coordinating API calls, scheduling multi-step tasks, or troubleshooting NonRetryableError, I/O context, serialization errors, or workflow execution failures.",
|
||||
"version": "1.0.0",
|
||||
"author": {
|
||||
"name": "Jeremy Dawes",
|
||||
"email": "jeremy@jezweb.net"
|
||||
},
|
||||
"skills": [
|
||||
"./"
|
||||
]
|
||||
}
|
||||
3
README.md
Normal file
3
README.md
Normal file
@@ -0,0 +1,3 @@
|
||||
# cloudflare-workflows
|
||||
|
||||
Build durable, long-running workflows on Cloudflare Workers with automatic retries, state persistence, and multi-step orchestration. Supports step.do, step.sleep, step.waitForEvent, and runs for hours to days. Use when: creating long-running workflows, implementing retry logic, building event-driven processes, coordinating API calls, scheduling multi-step tasks, or troubleshooting NonRetryableError, I/O context, serialization errors, or workflow execution failures.
|
||||
589
SKILL.md
Normal file
589
SKILL.md
Normal file
@@ -0,0 +1,589 @@
|
||||
---
|
||||
name: cloudflare-workflows
|
||||
description: |
|
||||
Build durable workflows with Cloudflare Workflows (GA April 2025). Features step.do, step.sleep, waitForEvent,
|
||||
Vitest testing, and runs for hours to days with automatic retries and state persistence.
|
||||
|
||||
Use when: creating long-running workflows, implementing retry logic, building event-driven processes,
|
||||
testing workflows with cloudflare:test, coordinating API calls, or troubleshooting NonRetryableError,
|
||||
I/O context errors, serialization failures.
|
||||
|
||||
Keywords: cloudflare workflows, workflows workers, durable execution, workflow step,
|
||||
WorkflowEntrypoint, step.do, step.sleep, workflow retries, NonRetryableError,
|
||||
workflow state, wrangler workflows, workflow events, long-running tasks, step.sleepUntil,
|
||||
step.waitForEvent, workflow bindings, vitest testing, cloudflare:test, introspectWorkflowInstance
|
||||
license: MIT
|
||||
---
|
||||
|
||||
# Cloudflare Workflows
|
||||
|
||||
**Status**: Production Ready ✅ (GA since April 2025)
|
||||
**Last Updated**: 2025-11-25
|
||||
**Dependencies**: cloudflare-worker-base (for Worker setup)
|
||||
**Latest Versions**: wrangler@4.50.0, @cloudflare/workers-types@4.20251121.0
|
||||
|
||||
**Recent Updates (2025)**:
|
||||
- **April 2025**: Workflows GA release - waitForEvent API, Vitest testing, CPU time metrics, 4,500 concurrent instances
|
||||
- **October 2025**: Instance creation rate 10x faster (100/sec), concurrency increased to 10,000
|
||||
- **2025 Limits**: Max steps 1,024, state persistence 1MB/step (100MB-1GB per instance), event payloads 1MB, CPU time 5 min max
|
||||
- **Testing**: cloudflare:test module with introspectWorkflowInstance, disableSleeps, mockStepResult, mockEvent modifiers
|
||||
- **Platform**: Waiting instances don't count toward concurrency, retention 3-30 days, subrequests 50-1,000
|
||||
|
||||
---
|
||||
|
||||
## Quick Start (5 Minutes)
|
||||
|
||||
```bash
|
||||
# 1. Scaffold project
|
||||
npm create cloudflare@latest my-workflow -- --template cloudflare/workflows-starter --git --deploy false
|
||||
cd my-workflow
|
||||
|
||||
# 2. Configure wrangler.jsonc
|
||||
{
|
||||
"name": "my-workflow",
|
||||
"main": "src/index.ts",
|
||||
"compatibility_date": "2025-11-25",
|
||||
"workflows": [{
|
||||
"name": "my-workflow",
|
||||
"binding": "MY_WORKFLOW",
|
||||
"class_name": "MyWorkflow"
|
||||
}]
|
||||
}
|
||||
|
||||
# 3. Create workflow (src/index.ts)
|
||||
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
|
||||
|
||||
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
|
||||
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
|
||||
const result = await step.do('process', async () => { /* work */ });
|
||||
await step.sleep('wait', '1 hour');
|
||||
await step.do('continue', async () => { /* more work */ });
|
||||
}
|
||||
}
|
||||
|
||||
# 4. Deploy and test
|
||||
npm run deploy
|
||||
npx wrangler workflows instances list my-workflow
|
||||
```
|
||||
|
||||
**CRITICAL**: Extends `WorkflowEntrypoint`, implements `run()` with `step` methods, bindings in wrangler.jsonc
|
||||
|
||||
---
|
||||
|
||||
|
||||
## Step Methods
|
||||
|
||||
### step.do() - Execute Work
|
||||
|
||||
```typescript
|
||||
step.do<T>(name: string, config?: WorkflowStepConfig, callback: () => Promise<T>): Promise<T>
|
||||
```
|
||||
|
||||
**Parameters:**
|
||||
- `name` - Step name (for observability)
|
||||
- `config` (optional) - Retry configuration (retries, timeout, backoff)
|
||||
- `callback` - Async function that does the work
|
||||
|
||||
**Returns:** Value from callback (must be serializable)
|
||||
|
||||
**Example:**
|
||||
```typescript
|
||||
const result = await step.do('call API', { retries: { limit: 10, delay: '10s', backoff: 'exponential' }, timeout: '5 min' }, async () => {
|
||||
return await fetch('https://api.example.com/data').then(r => r.json());
|
||||
});
|
||||
```
|
||||
|
||||
**CRITICAL - Serialization:**
|
||||
- ✅ Allowed: string, number, boolean, Array, Object, null
|
||||
- ❌ Forbidden: Function, Symbol, circular references, undefined
|
||||
- Throws error if return value isn't JSON serializable
|
||||
|
||||
---
|
||||
|
||||
### step.sleep() - Relative Sleep
|
||||
|
||||
```typescript
|
||||
step.sleep(name: string, duration: WorkflowDuration): Promise<void>
|
||||
```
|
||||
|
||||
**Parameters:**
|
||||
- `name` - Step name
|
||||
- `duration` - Number (ms) or string: `"second"`, `"minute"`, `"hour"`, `"day"`, `"week"`, `"month"`, `"year"` (plural forms accepted)
|
||||
|
||||
**Examples:**
|
||||
```typescript
|
||||
await step.sleep('wait 5 minutes', '5 minutes');
|
||||
await step.sleep('wait 1 hour', '1 hour');
|
||||
await step.sleep('wait 2 days', '2 days');
|
||||
await step.sleep('wait 30 seconds', 30000); // milliseconds
|
||||
```
|
||||
|
||||
**Note:** Resuming workflows take priority over new instances. Sleeps don't count toward step limits.
|
||||
|
||||
---
|
||||
|
||||
### step.sleepUntil() - Sleep to Specific Date
|
||||
|
||||
```typescript
|
||||
step.sleepUntil(name: string, timestamp: Date | number): Promise<void>
|
||||
```
|
||||
|
||||
**Parameters:**
|
||||
- `name` - Step name
|
||||
- `timestamp` - Date object or UNIX timestamp (milliseconds)
|
||||
|
||||
**Examples:**
|
||||
```typescript
|
||||
await step.sleepUntil('wait for launch', new Date('2025-12-25T00:00:00Z'));
|
||||
await step.sleepUntil('wait until time', Date.parse('24 Oct 2024 13:00:00 UTC'));
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### step.waitForEvent() - Wait for External Event (GA April 2025)
|
||||
|
||||
```typescript
|
||||
step.waitForEvent<T>(name: string, options: { type: string; timeout?: string | number }): Promise<T>
|
||||
```
|
||||
|
||||
**Parameters:**
|
||||
- `name` - Step name
|
||||
- `options.type` - Event type to match
|
||||
- `options.timeout` (optional) - Max wait time (default: 24 hours, max: 30 days)
|
||||
|
||||
**Returns:** Event payload sent via `instance.sendEvent()`
|
||||
|
||||
**Example:**
|
||||
```typescript
|
||||
export class PaymentWorkflow extends WorkflowEntrypoint<Env, Params> {
|
||||
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
|
||||
await step.do('create payment', async () => { /* Stripe API */ });
|
||||
|
||||
const webhookData = await step.waitForEvent<StripeWebhook>(
|
||||
'wait for payment confirmation',
|
||||
{ type: 'stripe-webhook', timeout: '1 hour' }
|
||||
);
|
||||
|
||||
if (webhookData.status === 'succeeded') {
|
||||
await step.do('fulfill order', async () => { /* fulfill */ });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Worker sends event to workflow
|
||||
export default {
|
||||
async fetch(req: Request, env: Env): Promise<Response> {
|
||||
if (req.url.includes('/webhook/stripe')) {
|
||||
const instance = await env.PAYMENT_WORKFLOW.get(instanceId);
|
||||
await instance.sendEvent({ type: 'stripe-webhook', payload: await req.json() });
|
||||
return new Response('OK');
|
||||
}
|
||||
}
|
||||
};
|
||||
```
|
||||
|
||||
**Timeout handling:**
|
||||
```typescript
|
||||
try {
|
||||
const event = await step.waitForEvent('wait for user', { type: 'user-submitted', timeout: '10 minutes' });
|
||||
} catch (error) {
|
||||
await step.do('send reminder', async () => { /* reminder */ });
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## WorkflowStepConfig
|
||||
|
||||
```typescript
|
||||
interface WorkflowStepConfig {
|
||||
retries?: {
|
||||
limit: number; // Max attempts (Infinity allowed)
|
||||
delay: string | number; // Delay between retries
|
||||
backoff?: 'constant' | 'linear' | 'exponential';
|
||||
};
|
||||
timeout?: string | number; // Max time per attempt
|
||||
}
|
||||
```
|
||||
|
||||
**Default:** `{ retries: { limit: 5, delay: 10000, backoff: 'exponential' }, timeout: '10 minutes' }`
|
||||
|
||||
**Backoff Examples:**
|
||||
```typescript
|
||||
// Constant: 30s, 30s, 30s
|
||||
{ retries: { limit: 3, delay: '30 seconds', backoff: 'constant' } }
|
||||
|
||||
// Linear: 1m, 2m, 3m, 4m, 5m
|
||||
{ retries: { limit: 5, delay: '1 minute', backoff: 'linear' } }
|
||||
|
||||
// Exponential (recommended): 10s, 20s, 40s, 80s, 160s
|
||||
{ retries: { limit: 10, delay: '10 seconds', backoff: 'exponential' }, timeout: '5 minutes' }
|
||||
|
||||
// Unlimited retries
|
||||
{ retries: { limit: Infinity, delay: '1 minute', backoff: 'exponential' } }
|
||||
|
||||
// No retries
|
||||
{ retries: { limit: 0 } }
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Error Handling
|
||||
|
||||
### NonRetryableError
|
||||
|
||||
Force workflow to fail immediately without retrying:
|
||||
|
||||
```typescript
|
||||
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
|
||||
import { NonRetryableError } from 'cloudflare:workflows';
|
||||
|
||||
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
|
||||
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
|
||||
await step.do('validate input', async () => {
|
||||
if (!event.payload.userId) {
|
||||
throw new NonRetryableError('userId is required');
|
||||
}
|
||||
|
||||
// Validate user exists
|
||||
const user = await this.env.DB.prepare(
|
||||
'SELECT * FROM users WHERE id = ?'
|
||||
).bind(event.payload.userId).first();
|
||||
|
||||
if (!user) {
|
||||
// Terminal error - retrying won't help
|
||||
throw new NonRetryableError('User not found');
|
||||
}
|
||||
|
||||
return user;
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**When to use NonRetryableError:**
|
||||
- ✅ Authentication/authorization failures
|
||||
- ✅ Invalid input that won't change
|
||||
- ✅ Resource doesn't exist (404)
|
||||
- ✅ Validation errors
|
||||
- ❌ Network failures (should retry)
|
||||
- ❌ Rate limits (should retry with backoff)
|
||||
- ❌ Temporary service outages (should retry)
|
||||
|
||||
---
|
||||
|
||||
### Catch Errors to Continue Workflow
|
||||
|
||||
Prevent workflow failure by catching optional step errors:
|
||||
|
||||
```typescript
|
||||
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
|
||||
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
|
||||
await step.do('process payment', async () => { /* critical */ });
|
||||
|
||||
try {
|
||||
await step.do('send email', async () => { /* optional */ });
|
||||
} catch (error) {
|
||||
await step.do('log failure', async () => {
|
||||
await this.env.DB.prepare('INSERT INTO failed_emails VALUES (?, ?)').bind(event.payload.userId, error.message).run();
|
||||
});
|
||||
}
|
||||
|
||||
await step.do('update status', async () => { /* continues */ });
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Graceful Degradation:**
|
||||
```typescript
|
||||
let result;
|
||||
try {
|
||||
result = await step.do('call primary API', async () => await callPrimaryAPI());
|
||||
} catch {
|
||||
result = await step.do('call backup API', async () => await callBackupAPI());
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Triggering Workflows
|
||||
|
||||
**Configure binding (wrangler.jsonc):**
|
||||
```jsonc
|
||||
{
|
||||
"workflows": [{
|
||||
"name": "my-workflow",
|
||||
"binding": "MY_WORKFLOW",
|
||||
"class_name": "MyWorkflow",
|
||||
"script_name": "workflow-worker" // If workflow in different Worker
|
||||
}]
|
||||
}
|
||||
```
|
||||
|
||||
**Trigger from Worker:**
|
||||
```typescript
|
||||
const instance = await env.MY_WORKFLOW.create({ params: { userId: '123' } });
|
||||
return Response.json({ id: instance.id, status: await instance.status() });
|
||||
```
|
||||
|
||||
**Instance Management:**
|
||||
```typescript
|
||||
const instance = await env.MY_WORKFLOW.get(instanceId);
|
||||
const status = await instance.status(); // { status: 'running'|'complete'|'errored'|'queued', error, output }
|
||||
await instance.sendEvent({ type: 'user-action', payload: { action: 'approved' } });
|
||||
await instance.pause();
|
||||
await instance.resume();
|
||||
await instance.terminate();
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
|
||||
## State Persistence
|
||||
|
||||
Workflows automatically persist state returned from `step.do()`:
|
||||
|
||||
**✅ Serializable:**
|
||||
- Primitives: `string`, `number`, `boolean`, `null`
|
||||
- Arrays, Objects, Nested structures
|
||||
|
||||
**❌ Non-Serializable:**
|
||||
- Functions, Symbols, circular references, undefined, class instances
|
||||
|
||||
**Example:**
|
||||
```typescript
|
||||
// ✅ Good
|
||||
const result = await step.do('fetch data', async () => ({
|
||||
users: [{ id: 1, name: 'Alice' }],
|
||||
timestamp: Date.now(),
|
||||
metadata: null
|
||||
}));
|
||||
|
||||
// ❌ Bad - function not serializable
|
||||
const bad = await step.do('bad', async () => ({ data: [1, 2, 3], transform: (x) => x * 2 })); // Throws error!
|
||||
```
|
||||
|
||||
**Access State Across Steps:**
|
||||
```typescript
|
||||
const userData = await step.do('fetch user', async () => ({ id: 123, email: 'user@example.com' }));
|
||||
const orderData = await step.do('create order', async () => ({ userId: userData.id, orderId: 'ORD-456' }));
|
||||
await step.do('send email', async () => sendEmail({ to: userData.email, subject: `Order ${orderData.orderId}` }));
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Observability
|
||||
|
||||
### Built-in Metrics (Enhanced in 2025)
|
||||
|
||||
Workflows automatically track:
|
||||
- **Instance status**: queued, running, complete, errored, paused, waiting
|
||||
- **Step execution**: start/end times, duration, success/failure
|
||||
- **Retry history**: attempts, errors, delays
|
||||
- **Sleep state**: when workflow will wake up
|
||||
- **Output**: return values from steps and run()
|
||||
- **CPU time** (GA April 2025): Active processing time per instance for billing insights
|
||||
|
||||
### View Metrics in Dashboard
|
||||
|
||||
Access via Cloudflare dashboard:
|
||||
1. Workers & Pages
|
||||
2. Select your workflow
|
||||
3. View instances and metrics
|
||||
|
||||
**Metrics include:**
|
||||
- Total instances created
|
||||
- Success/error rates
|
||||
- Average execution time
|
||||
- Step-level performance
|
||||
- **CPU time consumption** (2025 feature)
|
||||
|
||||
### Programmatic Access
|
||||
|
||||
```typescript
|
||||
const instance = await env.MY_WORKFLOW.get(instanceId);
|
||||
const status = await instance.status();
|
||||
|
||||
console.log(status);
|
||||
// {
|
||||
// status: 'complete' | 'running' | 'errored' | 'queued' | 'waiting' | 'unknown',
|
||||
// error: string | null,
|
||||
// output: { userId: '123', status: 'processed' }
|
||||
// }
|
||||
```
|
||||
|
||||
**CPU Time Configuration (2025):**
|
||||
```jsonc
|
||||
// wrangler.jsonc
|
||||
{ "limits": { "cpu_ms": 300000 } } // 5 minutes max (default: 30 seconds)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Limits (Updated 2025)
|
||||
|
||||
| Feature | Workers Free | Workers Paid |
|
||||
|---------|--------------|--------------|
|
||||
| **Max steps per workflow** | 1,024 | 1,024 |
|
||||
| **Max state per step** | 1 MiB | 1 MiB |
|
||||
| **Max state per instance** | 100 MB | 1 GB |
|
||||
| **Max event payload size** | 1 MiB | 1 MiB |
|
||||
| **Max sleep/sleepUntil duration** | 365 days | 365 days |
|
||||
| **Max waitForEvent timeout** | 365 days | 365 days |
|
||||
| **CPU time per step** | 10 ms | 30 sec (default), 5 min (max) |
|
||||
| **Duration (wall clock) per step** | Unlimited | Unlimited |
|
||||
| **Max workflow executions** | 100,000/day | Unlimited |
|
||||
| **Concurrent instances** | 25 | 10,000 (Oct 2025, up from 4,500) |
|
||||
| **Instance creation rate** | 100/second | 100/second (Oct 2025, 10x faster) |
|
||||
| **Max queued instances** | 100,000 | 1,000,000 |
|
||||
| **Max subrequests per instance** | 50/request | 1,000/request |
|
||||
| **Retention (completed state)** | 3 days | 30 days |
|
||||
| **Max Workflow name length** | 64 chars | 64 chars |
|
||||
| **Max instance ID length** | 100 chars | 100 chars |
|
||||
|
||||
**CRITICAL Notes:**
|
||||
- `step.sleep()` and `step.sleepUntil()` do NOT count toward 1,024 step limit
|
||||
- **Waiting instances** (sleeping, retrying, or waiting for events) do NOT count toward concurrency limits
|
||||
- Instance creation rate increased 10x (October 2025): 100 per 10 seconds → 100 per second
|
||||
- Max concurrency increased (October 2025): 4,500 → 10,000 concurrent instances
|
||||
- State persistence limits increased (2025): 128 KB → 1 MiB per step, 100 MB - 1 GB per instance
|
||||
- Event payload size increased (2025): 128 KB → 1 MiB
|
||||
- CPU time configurable via `wrangler.jsonc`: `{ "limits": { "cpu_ms": 300000 } }` (5 min max)
|
||||
|
||||
---
|
||||
|
||||
## Pricing
|
||||
|
||||
**Requires Workers Paid plan** ($5/month)
|
||||
|
||||
**Workflow Executions:**
|
||||
- First 10,000,000 step executions/month: **FREE**
|
||||
- After that: **$0.30 per million step executions**
|
||||
|
||||
**What counts as a step execution:**
|
||||
- Each `step.do()` call
|
||||
- Each retry of a step
|
||||
- `step.sleep()`, `step.sleepUntil()`, `step.waitForEvent()` do NOT count
|
||||
|
||||
**Cost examples:**
|
||||
- Workflow with 5 steps, no retries: **5 step executions**
|
||||
- Workflow with 3 steps, 1 step retries 2 times: **5 step executions** (3 + 2)
|
||||
- 10M simple workflows/month (5 steps each): ((50M - 10M) / 1M) × $0.30 = **$12/month**
|
||||
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Issue: "Cannot perform I/O on behalf of a different request"
|
||||
|
||||
**Cause:** Trying to use I/O objects created in one request context from another request handler
|
||||
|
||||
**Solution:** Always perform I/O within `step.do()` callbacks
|
||||
|
||||
```typescript
|
||||
// ❌ Bad - I/O outside step
|
||||
const response = await fetch('https://api.example.com/data');
|
||||
const data = await response.json();
|
||||
|
||||
await step.do('use data', async () => {
|
||||
// Using data from outside step's I/O context
|
||||
return data; // This will fail!
|
||||
});
|
||||
|
||||
// ✅ Good - I/O inside step
|
||||
const data = await step.do('fetch data', async () => {
|
||||
const response = await fetch('https://api.example.com/data');
|
||||
return await response.json(); // ✅ Correct
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Issue: NonRetryableError behaves differently in dev vs production
|
||||
|
||||
**Known Issue:** Throwing NonRetryableError with empty message in dev mode causes retries, but works correctly in production
|
||||
|
||||
**Workaround:** Always provide a message to NonRetryableError
|
||||
|
||||
```typescript
|
||||
// ❌ May retry in dev
|
||||
throw new NonRetryableError();
|
||||
|
||||
// ✅ Works consistently
|
||||
throw new NonRetryableError('User not found');
|
||||
```
|
||||
|
||||
**Source:** [workers-sdk#10113](https://github.com/cloudflare/workers-sdk/issues/10113)
|
||||
|
||||
---
|
||||
|
||||
## Vitest Testing (GA April 2025)
|
||||
|
||||
Workflows support full testing integration via `cloudflare:test` module.
|
||||
|
||||
### Setup
|
||||
|
||||
```bash
|
||||
npm install -D vitest@latest @cloudflare/vitest-pool-workers@latest
|
||||
```
|
||||
|
||||
**vitest.config.ts:**
|
||||
```typescript
|
||||
import { defineWorkersConfig } from '@cloudflare/vitest-pool-workers/config';
|
||||
export default defineWorkersConfig({ test: { poolOptions: { workers: { miniflare: { bindings: { MY_WORKFLOW: { scriptName: 'workflow' } } } } } } });
|
||||
```
|
||||
|
||||
### Introspection API
|
||||
|
||||
```typescript
|
||||
import { env, introspectWorkflowInstance } from 'cloudflare:test';
|
||||
|
||||
it('should complete workflow', async () => {
|
||||
const instance = await introspectWorkflowInstance(env.MY_WORKFLOW, 'test-123');
|
||||
|
||||
try {
|
||||
await instance.modify(async (m) => {
|
||||
await m.disableSleeps(); // Skip all sleeps
|
||||
await m.mockStepResult({ name: 'fetch data' }, { users: [{ id: 1 }] }); // Mock step result
|
||||
await m.mockEvent({ type: 'approval', payload: { approved: true } }); // Send mock event
|
||||
await m.mockStepError({ name: 'call API' }, new Error('Network timeout'), 1); // Force error once
|
||||
});
|
||||
|
||||
await env.MY_WORKFLOW.create({ id: 'test-123' });
|
||||
await expect(instance.waitForStatus('complete')).resolves.not.toThrow();
|
||||
} finally {
|
||||
await instance.dispose(); // Cleanup
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
### Test Modifiers
|
||||
|
||||
- `disableSleeps(steps?)` - Skip sleeps instantly
|
||||
- `mockStepResult(step, result)` - Mock step.do() result
|
||||
- `mockStepError(step, error, times?)` - Force step.do() to throw
|
||||
- `mockEvent(event)` - Send mock event to step.waitForEvent()
|
||||
- `forceStepTimeout(step, times?)` - Force step.do() timeout
|
||||
- `forceEventTimeout(step)` - Force step.waitForEvent() timeout
|
||||
|
||||
**Official Docs**: https://developers.cloudflare.com/workers/testing/vitest-integration/
|
||||
|
||||
---
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- **Cloudflare Workflows Docs**: https://developers.cloudflare.com/workflows/
|
||||
- **Get Started Guide**: https://developers.cloudflare.com/workflows/get-started/guide/
|
||||
- **Workers API**: https://developers.cloudflare.com/workflows/build/workers-api/
|
||||
- **Vitest Testing**: https://developers.cloudflare.com/workers/testing/vitest-integration/
|
||||
- **Sleeping and Retrying**: https://developers.cloudflare.com/workflows/build/sleeping-and-retrying/
|
||||
- **Events and Parameters**: https://developers.cloudflare.com/workflows/build/events-and-parameters/
|
||||
- **Limits**: https://developers.cloudflare.com/workflows/reference/limits/
|
||||
- **Pricing**: https://developers.cloudflare.com/workflows/platform/pricing/
|
||||
- **Changelog**: https://developers.cloudflare.com/workflows/reference/changelog/
|
||||
- **MCP Tool**: Use `mcp__cloudflare-docs__search_cloudflare_documentation` for latest docs
|
||||
|
||||
---
|
||||
|
||||
**Last Updated**: 2025-11-25
|
||||
**Version**: 1.0.0
|
||||
**Maintainer**: Jeremy Dawes | jeremy@jezweb.net
|
||||
77
plugin.lock.json
Normal file
77
plugin.lock.json
Normal file
@@ -0,0 +1,77 @@
|
||||
{
|
||||
"$schema": "internal://schemas/plugin.lock.v1.json",
|
||||
"pluginId": "gh:jezweb/claude-skills:skills/cloudflare-workflows",
|
||||
"normalized": {
|
||||
"repo": null,
|
||||
"ref": "refs/tags/v20251128.0",
|
||||
"commit": "dc24af870e89687c7c213c63d93b5328717e9c39",
|
||||
"treeHash": "928c2023d9f2b055674faca59bf755b330124301b94c312810ffb099a294812e",
|
||||
"generatedAt": "2025-11-28T10:18:57.228742Z",
|
||||
"toolVersion": "publish_plugins.py@0.2.0"
|
||||
},
|
||||
"origin": {
|
||||
"remote": "git@github.com:zhongweili/42plugin-data.git",
|
||||
"branch": "master",
|
||||
"commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390",
|
||||
"repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data"
|
||||
},
|
||||
"manifest": {
|
||||
"name": "cloudflare-workflows",
|
||||
"description": "Build durable, long-running workflows on Cloudflare Workers with automatic retries, state persistence, and multi-step orchestration. Supports step.do, step.sleep, step.waitForEvent, and runs for hours to days. Use when: creating long-running workflows, implementing retry logic, building event-driven processes, coordinating API calls, scheduling multi-step tasks, or troubleshooting NonRetryableError, I/O context, serialization errors, or workflow execution failures.",
|
||||
"version": "1.0.0"
|
||||
},
|
||||
"content": {
|
||||
"files": [
|
||||
{
|
||||
"path": "README.md",
|
||||
"sha256": "950015c9480bf32195e22418aaae3544642911a799ff3112ed314b5d6e854b07"
|
||||
},
|
||||
{
|
||||
"path": "SKILL.md",
|
||||
"sha256": "e6cd4e70d38e2ee2bf1eb3381f4324cacb85fe73db3679b6e14b835e5723fec2"
|
||||
},
|
||||
{
|
||||
"path": "references/common-issues.md",
|
||||
"sha256": "7964a10795380621d2ae6c3f6d8cdff81ac0ac8d876142331160446558e3c0e6"
|
||||
},
|
||||
{
|
||||
"path": "references/workflow-patterns.md",
|
||||
"sha256": "099ae2b8a1d190a6b8d6b74a1e464a96655d19059eceb075da0e60690380dcaf"
|
||||
},
|
||||
{
|
||||
"path": ".claude-plugin/plugin.json",
|
||||
"sha256": "1cda1aa87cf945ab9220c0b0b3cb2e41986a3de043d25b1978704a689604aab8"
|
||||
},
|
||||
{
|
||||
"path": "templates/wrangler-workflows-config.jsonc",
|
||||
"sha256": "6eae31c64dcb9ba03e8cc0de42c38d138fcae17279d0eb4766e82bf1ed34c684"
|
||||
},
|
||||
{
|
||||
"path": "templates/scheduled-workflow.ts",
|
||||
"sha256": "ca55491a2ee0d97f791d6017c56a030090718baf6e21809d15339b82e9989881"
|
||||
},
|
||||
{
|
||||
"path": "templates/workflow-with-retries.ts",
|
||||
"sha256": "f80370e3bdd64a34efc7d1ae506779806e32499dfc009f8002a49cccd8cb969d"
|
||||
},
|
||||
{
|
||||
"path": "templates/worker-trigger.ts",
|
||||
"sha256": "bead780a84a8e3dae466b0008ccdf6bfda7eebd34ba70906dcb6e356bb82fc4d"
|
||||
},
|
||||
{
|
||||
"path": "templates/basic-workflow.ts",
|
||||
"sha256": "e32b3b5b671a6897af32b7f7a5bf640a30f9b0c9094ba5e3822cb5ad7fd5bdb5"
|
||||
},
|
||||
{
|
||||
"path": "templates/workflow-with-events.ts",
|
||||
"sha256": "7644d8a6a9600365f86045d7bc391ff6cc3eca755f152cfc2a0dded9cfa5dc78"
|
||||
}
|
||||
],
|
||||
"dirSha256": "928c2023d9f2b055674faca59bf755b330124301b94c312810ffb099a294812e"
|
||||
},
|
||||
"security": {
|
||||
"scannedAt": null,
|
||||
"scannerVersion": null,
|
||||
"flags": []
|
||||
}
|
||||
}
|
||||
413
references/common-issues.md
Normal file
413
references/common-issues.md
Normal file
@@ -0,0 +1,413 @@
|
||||
# Cloudflare Workflows - Common Issues
|
||||
|
||||
**Last Updated**: 2025-10-22
|
||||
|
||||
This document details all known issues with Cloudflare Workflows and their solutions.
|
||||
|
||||
---
|
||||
|
||||
## Issue #1: I/O Context Error
|
||||
|
||||
**Error Message:**
|
||||
```
|
||||
Cannot perform I/O on behalf of a different request
|
||||
```
|
||||
|
||||
**Description:**
|
||||
When trying to use I/O objects (like fetch responses, file handles, etc.) created in one request context from a different request's handler, Cloudflare Workers throws this error. This is a fundamental Workers platform limitation.
|
||||
|
||||
**Root Cause:**
|
||||
I/O objects are bound to the request context that created them. Workflows create a new execution context for each step, so I/O must happen within the step's callback.
|
||||
|
||||
**Prevention:**
|
||||
|
||||
❌ **Bad - I/O outside step:**
|
||||
```typescript
|
||||
// This will fail!
|
||||
const response = await fetch('https://api.example.com/data');
|
||||
const data = await response.json();
|
||||
|
||||
await step.do('use data', async () => {
|
||||
// Trying to use data from outside step's context
|
||||
return data; // ❌ Error!
|
||||
});
|
||||
```
|
||||
|
||||
✅ **Good - I/O inside step:**
|
||||
```typescript
|
||||
const data = await step.do('fetch data', async () => {
|
||||
const response = await fetch('https://api.example.com/data');
|
||||
return await response.json(); // ✅ Correct
|
||||
});
|
||||
```
|
||||
|
||||
**Workaround:**
|
||||
Always perform all I/O operations (fetch, KV reads, D1 queries, R2 operations) within `step.do()` callbacks.
|
||||
|
||||
**Source:** Cloudflare Workers platform limitation
|
||||
|
||||
---
|
||||
|
||||
## Issue #2: NonRetryableError Behaves Differently in Dev vs Production
|
||||
|
||||
**Error Message:**
|
||||
```
|
||||
(No specific error - workflow retries when it shouldn't)
|
||||
```
|
||||
|
||||
**Description:**
|
||||
When throwing a `NonRetryableError` with an empty message in development mode (`wrangler dev`), the workflow incorrectly retries the failed step. In production, it correctly exits without retrying.
|
||||
|
||||
**Root Cause:**
|
||||
Bug in the development environment handling of empty NonRetryableError messages.
|
||||
|
||||
**Prevention:**
|
||||
|
||||
❌ **Bad - Empty message:**
|
||||
```typescript
|
||||
import { NonRetryableError } from 'cloudflare:workflows';
|
||||
|
||||
// May retry in dev mode
|
||||
throw new NonRetryableError();
|
||||
```
|
||||
|
||||
✅ **Good - Always provide message:**
|
||||
```typescript
|
||||
import { NonRetryableError } from 'cloudflare:workflows';
|
||||
|
||||
// Works consistently in dev and production
|
||||
throw new NonRetryableError('User not found');
|
||||
throw new NonRetryableError('Invalid authentication credentials');
|
||||
throw new NonRetryableError('Amount exceeds limit');
|
||||
```
|
||||
|
||||
**Workaround:**
|
||||
Always provide a descriptive message when throwing NonRetryableError.
|
||||
|
||||
**Source:** [cloudflare/workers-sdk#10113](https://github.com/cloudflare/workers-sdk/issues/10113)
|
||||
**Status:** Reported July 2025, not yet fixed
|
||||
|
||||
---
|
||||
|
||||
## Issue #3: WorkflowEvent Export Not Found
|
||||
|
||||
**Error Message:**
|
||||
```
|
||||
The requested module 'cloudflare:workers' does not provide an export named 'WorkflowEvent'
|
||||
```
|
||||
|
||||
**Description:**
|
||||
TypeScript cannot find the `WorkflowEvent` export from the `cloudflare:workers` module. This usually happens with outdated type definitions.
|
||||
|
||||
**Root Cause:**
|
||||
- Outdated `@cloudflare/workers-types` package
|
||||
- Incorrect import statement
|
||||
- Missing types in tsconfig.json
|
||||
|
||||
**Prevention:**
|
||||
|
||||
✅ **Ensure latest types installed:**
|
||||
```bash
|
||||
npm install -D @cloudflare/workers-types@latest
|
||||
```
|
||||
|
||||
✅ **Correct import:**
|
||||
```typescript
|
||||
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
|
||||
import { NonRetryableError } from 'cloudflare:workflows';
|
||||
```
|
||||
|
||||
✅ **Correct tsconfig.json:**
|
||||
```json
|
||||
{
|
||||
"compilerOptions": {
|
||||
"types": ["@cloudflare/workers-types/2023-07-01"],
|
||||
"moduleResolution": "bundler"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Workaround:**
|
||||
1. Update workers types: `npm install -D @cloudflare/workers-types@latest`
|
||||
2. Run type generation: `npx wrangler types`
|
||||
3. Restart TypeScript server in your editor
|
||||
|
||||
**Source:** Community reports, package versioning issues
|
||||
**Latest Working Version:** @cloudflare/workers-types@4.20251014.0 (verified 2025-10-22)
|
||||
|
||||
---
|
||||
|
||||
## Issue #4: Serialization Error - Non-Serializable Return Values
|
||||
|
||||
**Error Message:**
|
||||
```
|
||||
Error: Could not serialize return value
|
||||
(or workflow hangs without clear error)
|
||||
```
|
||||
|
||||
**Description:**
|
||||
Attempting to return non-serializable values from `step.do()` or `run()` methods causes serialization failures. The workflow instance may error or hang.
|
||||
|
||||
**Root Cause:**
|
||||
Workflows persist state between steps by serializing return values. Only JSON-serializable types are supported.
|
||||
|
||||
**Prevention:**
|
||||
|
||||
❌ **Bad - Non-serializable types:**
|
||||
```typescript
|
||||
// ❌ Function
|
||||
await step.do('bad example', async () => {
|
||||
return {
|
||||
data: [1, 2, 3],
|
||||
transform: (x) => x * 2 // ❌ Function not serializable
|
||||
};
|
||||
});
|
||||
|
||||
// ❌ Circular reference
|
||||
await step.do('bad example 2', async () => {
|
||||
const obj: any = { name: 'test' };
|
||||
obj.self = obj; // ❌ Circular reference
|
||||
return obj;
|
||||
});
|
||||
|
||||
// ❌ Symbol
|
||||
await step.do('bad example 3', async () => {
|
||||
return {
|
||||
id: Symbol('unique'), // ❌ Symbol not serializable
|
||||
data: 'test'
|
||||
};
|
||||
});
|
||||
|
||||
// ❌ undefined (use null instead)
|
||||
await step.do('bad example 4', async () => {
|
||||
return {
|
||||
value: undefined // ❌ undefined not serializable
|
||||
};
|
||||
});
|
||||
```
|
||||
|
||||
✅ **Good - Only serializable types:**
|
||||
```typescript
|
||||
await step.do('good example', async () => {
|
||||
return {
|
||||
// ✅ Primitives
|
||||
string: 'value',
|
||||
number: 123,
|
||||
boolean: true,
|
||||
nullValue: null,
|
||||
|
||||
// ✅ Arrays
|
||||
array: [1, 2, 3],
|
||||
|
||||
// ✅ Objects
|
||||
nested: {
|
||||
data: 'test',
|
||||
items: [{ id: 1 }, { id: 2 }]
|
||||
}
|
||||
};
|
||||
});
|
||||
```
|
||||
|
||||
✅ **Convert class instances to plain objects:**
|
||||
```typescript
|
||||
class User {
|
||||
constructor(public id: string, public name: string) {}
|
||||
|
||||
toJSON() {
|
||||
return { id: this.id, name: this.name };
|
||||
}
|
||||
}
|
||||
|
||||
await step.do('serialize class', async () => {
|
||||
const user = new User('123', 'Alice');
|
||||
|
||||
// ✅ Convert to plain object
|
||||
return user.toJSON(); // { id: '123', name: 'Alice' }
|
||||
});
|
||||
```
|
||||
|
||||
**Workaround:**
|
||||
- Only return primitives, arrays, and plain objects
|
||||
- Convert class instances to plain objects before returning
|
||||
- Use `null` instead of `undefined`
|
||||
- Avoid circular references
|
||||
|
||||
**Source:** Cloudflare Workflows documentation
|
||||
**Reference:** [Workflows Workers API](https://developers.cloudflare.com/workflows/build/workers-api/)
|
||||
|
||||
---
|
||||
|
||||
## Issue #5: Testing Workflows in CI Environments
|
||||
|
||||
**Error Message:**
|
||||
```
|
||||
(Tests pass locally but fail in CI)
|
||||
```
|
||||
|
||||
**Description:**
|
||||
Tests that use `vitest-pool-workers` to test workflows work reliably in local development but fail inconsistently in CI environments (GitHub Actions, GitLab CI, etc.).
|
||||
|
||||
**Root Cause:**
|
||||
- Timing issues in CI environments
|
||||
- Resource constraints in CI runners
|
||||
- Race conditions in test setup/teardown
|
||||
|
||||
**Prevention:**
|
||||
|
||||
✅ **Increase timeouts in CI:**
|
||||
```typescript
|
||||
// vitest.config.ts
|
||||
import { defineConfig } from 'vitest/config';
|
||||
|
||||
export default defineConfig({
|
||||
test: {
|
||||
testTimeout: 30000, // Increase from default 5000ms
|
||||
poolOptions: {
|
||||
workers: {
|
||||
wrangler: { configPath: './wrangler.jsonc' },
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
✅ **Add retry logic for flaky tests:**
|
||||
```typescript
|
||||
describe('Workflow tests', () => {
|
||||
it.retry(3)('should complete workflow', async () => {
|
||||
// Test code
|
||||
});
|
||||
});
|
||||
```
|
||||
|
||||
✅ **Use proper test isolation:**
|
||||
```typescript
|
||||
import { beforeEach, afterEach } from 'vitest';
|
||||
|
||||
let instance: WorkflowInstance;
|
||||
|
||||
beforeEach(async () => {
|
||||
instance = await env.MY_WORKFLOW.create({
|
||||
params: { userId: '123' }
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (instance) {
|
||||
try {
|
||||
await instance.terminate();
|
||||
} catch (error) {
|
||||
// Instance may already be terminated
|
||||
}
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
**Workaround:**
|
||||
1. Increase test timeouts for CI
|
||||
2. Add retry logic for flaky tests
|
||||
3. Use proper test isolation
|
||||
4. Consider mocking workflows in unit tests, testing real workflows in integration tests
|
||||
|
||||
**Source:** [cloudflare/workers-sdk#10600](https://github.com/cloudflare/workers-sdk/issues/10600)
|
||||
**Status:** Ongoing investigation
|
||||
|
||||
---
|
||||
|
||||
## Additional Troubleshooting Tips
|
||||
|
||||
### Workflow Instance Stuck in "Running" State
|
||||
|
||||
**Possible Causes:**
|
||||
1. Step is sleeping for long duration
|
||||
2. Step is waiting for event that never arrives
|
||||
3. Step is retrying with long backoff
|
||||
|
||||
**Solution:**
|
||||
```bash
|
||||
# Check detailed instance status
|
||||
npx wrangler workflows instances describe my-workflow <instance-id>
|
||||
|
||||
# Look for:
|
||||
# - Sleep state (shows wake time)
|
||||
# - waitForEvent state (shows event type and timeout)
|
||||
# - Retry history (shows attempts and delays)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Step Returns Undefined
|
||||
|
||||
**Cause:** Missing return statement in step callback
|
||||
|
||||
**Solution:**
|
||||
```typescript
|
||||
// ❌ Bad - no return
|
||||
const result = await step.do('get data', async () => {
|
||||
const data = await fetchData();
|
||||
// Missing return!
|
||||
});
|
||||
console.log(result); // undefined
|
||||
|
||||
// ✅ Good - explicit return
|
||||
const result = await step.do('get data', async () => {
|
||||
const data = await fetchData();
|
||||
return data; // ✅ Return the value
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Payload Too Large Error
|
||||
|
||||
**Error:**
|
||||
```
|
||||
Payload size exceeds limit
|
||||
```
|
||||
|
||||
**Cause:** Workflow parameters or step outputs exceed 128 KB
|
||||
|
||||
**Solution:**
|
||||
```typescript
|
||||
// ❌ Bad - large payload
|
||||
await env.MY_WORKFLOW.create({
|
||||
params: {
|
||||
largeData: hugeArray // >128 KB
|
||||
}
|
||||
});
|
||||
|
||||
// ✅ Good - store in R2/KV, pass reference
|
||||
const key = `workflow-data/${crypto.randomUUID()}`;
|
||||
await env.MY_BUCKET.put(key, JSON.stringify(hugeArray));
|
||||
|
||||
await env.MY_WORKFLOW.create({
|
||||
params: {
|
||||
dataKey: key // Just pass the key
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Getting Help
|
||||
|
||||
If you encounter issues not listed here:
|
||||
|
||||
1. **Check Cloudflare Status**: https://www.cloudflarestatus.com/
|
||||
2. **Search GitHub Issues**: https://github.com/cloudflare/workers-sdk/issues
|
||||
3. **Cloudflare Discord**: https://discord.gg/cloudflaredev
|
||||
4. **Cloudflare Community**: https://community.cloudflare.com/
|
||||
5. **Official Docs**: https://developers.cloudflare.com/workflows/
|
||||
|
||||
When reporting issues, include:
|
||||
- Workflow code (sanitized)
|
||||
- Wrangler configuration
|
||||
- Error messages and stack traces
|
||||
- Workflow instance ID
|
||||
- Steps to reproduce
|
||||
- Expected vs actual behavior
|
||||
|
||||
---
|
||||
|
||||
**Last Updated**: 2025-10-22
|
||||
**Maintainer**: Jeremy Dawes | jeremy@jezweb.net
|
||||
585
references/workflow-patterns.md
Normal file
585
references/workflow-patterns.md
Normal file
@@ -0,0 +1,585 @@
|
||||
# Cloudflare Workflows - Production Patterns
|
||||
|
||||
**Last Updated**: 2025-10-22
|
||||
|
||||
This document provides battle-tested patterns for building production-ready Cloudflare Workflows.
|
||||
|
||||
---
|
||||
|
||||
## Table of Contents
|
||||
|
||||
1. [Idempotency Patterns](#idempotency-patterns)
|
||||
2. [Error Handling Patterns](#error-handling-patterns)
|
||||
3. [Long-Running Process Patterns](#long-running-process-patterns)
|
||||
4. [Human-in-the-Loop Patterns](#human-in-the-loop-patterns)
|
||||
5. [Workflow Chaining Patterns](#workflow-chaining-patterns)
|
||||
6. [Testing Patterns](#testing-patterns)
|
||||
7. [Monitoring Patterns](#monitoring-patterns)
|
||||
|
||||
---
|
||||
|
||||
## Idempotency Patterns
|
||||
|
||||
### Pattern 1: Idempotency Keys
|
||||
|
||||
**Problem:** Workflow steps may execute multiple times due to retries.
|
||||
|
||||
**Solution:** Use idempotency keys to ensure operations execute only once.
|
||||
|
||||
```typescript
|
||||
export class PaymentWorkflow extends WorkflowEntrypoint<Env, PaymentParams> {
|
||||
async run(event: WorkflowEvent<PaymentParams>, step: WorkflowStep) {
|
||||
const { orderId, amount } = event.payload;
|
||||
|
||||
// Generate idempotency key from workflow instance ID + step name
|
||||
const idempotencyKey = `${event.instanceId}-charge-payment`;
|
||||
|
||||
const paymentResult = await step.do('charge payment', async () => {
|
||||
// Check if already processed
|
||||
const existing = await this.env.KV.get(`payment:${idempotencyKey}`);
|
||||
if (existing) {
|
||||
console.log('Payment already processed, returning cached result');
|
||||
return JSON.parse(existing);
|
||||
}
|
||||
|
||||
// Process payment
|
||||
const response = await fetch('https://payment-gateway.example.com/charge', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Idempotency-Key': idempotencyKey // Payment gateway checks this
|
||||
},
|
||||
body: JSON.stringify({ orderId, amount })
|
||||
});
|
||||
|
||||
const result = await response.json();
|
||||
|
||||
// Cache result
|
||||
await this.env.KV.put(
|
||||
`payment:${idempotencyKey}`,
|
||||
JSON.stringify(result),
|
||||
{ expirationTtl: 86400 } // 24 hours
|
||||
});
|
||||
|
||||
return result;
|
||||
});
|
||||
|
||||
return { orderId, transactionId: paymentResult.transactionId };
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Pattern 2: Database Upsert for Idempotency
|
||||
|
||||
```typescript
|
||||
await step.do('create order', async () => {
|
||||
// Use INSERT OR REPLACE to make idempotent
|
||||
await this.env.DB.prepare(`
|
||||
INSERT INTO orders (id, user_id, amount, status, created_at)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
user_id = excluded.user_id,
|
||||
amount = excluded.amount,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
`).bind(
|
||||
orderId,
|
||||
userId,
|
||||
amount,
|
||||
'pending',
|
||||
new Date().toISOString()
|
||||
).run();
|
||||
|
||||
return { orderId };
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Error Handling Patterns
|
||||
|
||||
### Pattern 1: Categorize Errors for Retry Logic
|
||||
|
||||
```typescript
|
||||
async function shouldRetry(error: Error): Promise<boolean> {
|
||||
// Don't retry on client errors (4xx)
|
||||
if (error.message.includes('400') ||
|
||||
error.message.includes('401') ||
|
||||
error.message.includes('403') ||
|
||||
error.message.includes('404')) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Retry on server errors (5xx) and network errors
|
||||
return true;
|
||||
}
|
||||
|
||||
await step.do('call API', async () => {
|
||||
try {
|
||||
const response = await fetch('https://api.example.com/data');
|
||||
|
||||
if (!response.ok) {
|
||||
const error = new Error(`API error: ${response.status}`);
|
||||
|
||||
if (!await shouldRetry(error)) {
|
||||
throw new NonRetryableError(error.message);
|
||||
}
|
||||
|
||||
throw error; // Will retry
|
||||
}
|
||||
|
||||
return await response.json();
|
||||
} catch (error) {
|
||||
if (error instanceof NonRetryableError) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Network error - retry
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Pattern 2: Circuit Breaker
|
||||
|
||||
```typescript
|
||||
export class CircuitBreaker {
|
||||
constructor(
|
||||
private kv: KVNamespace,
|
||||
private serviceName: string,
|
||||
private threshold: number = 5,
|
||||
private resetTime: number = 60000 // 1 minute
|
||||
) {}
|
||||
|
||||
async call<T>(fn: () => Promise<T>): Promise<T> {
|
||||
const key = `circuit:${this.serviceName}`;
|
||||
const state = await this.kv.get(key, 'json') as {
|
||||
failures: number;
|
||||
lastFailure: number;
|
||||
} | null;
|
||||
|
||||
// Check if circuit is open
|
||||
if (state && state.failures >= this.threshold) {
|
||||
const elapsed = Date.now() - state.lastFailure;
|
||||
|
||||
if (elapsed < this.resetTime) {
|
||||
throw new NonRetryableError(
|
||||
`Circuit breaker open for ${this.serviceName}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await fn();
|
||||
|
||||
// Reset on success
|
||||
await this.kv.delete(key);
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
// Increment failure count
|
||||
const newState = {
|
||||
failures: (state?.failures || 0) + 1,
|
||||
lastFailure: Date.now()
|
||||
};
|
||||
|
||||
await this.kv.put(key, JSON.stringify(newState), {
|
||||
expirationTtl: this.resetTime / 1000
|
||||
});
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Usage
|
||||
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
|
||||
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
|
||||
const circuitBreaker = new CircuitBreaker(this.env.KV, 'external-api');
|
||||
|
||||
await step.do('call external API', async () => {
|
||||
return await circuitBreaker.call(async () => {
|
||||
const response = await fetch('https://external-api.example.com/data');
|
||||
return await response.json();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Pattern 3: Graceful Degradation
|
||||
|
||||
```typescript
|
||||
await step.do('fetch user preferences', async () => {
|
||||
try {
|
||||
const response = await fetch(`https://api.example.com/users/${userId}/preferences`);
|
||||
if (!response.ok) throw new Error('Failed to fetch preferences');
|
||||
return await response.json();
|
||||
} catch (error) {
|
||||
console.warn('Failed to fetch preferences, using defaults:', error);
|
||||
|
||||
// Fallback to defaults
|
||||
return {
|
||||
theme: 'light',
|
||||
language: 'en',
|
||||
notifications: true
|
||||
};
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Long-Running Process Patterns
|
||||
|
||||
### Pattern 1: Polling with Exponential Backoff
|
||||
|
||||
```typescript
|
||||
export class VideoProcessingWorkflow extends WorkflowEntrypoint<Env, VideoParams> {
|
||||
async run(event: WorkflowEvent<VideoParams>, step: WorkflowStep) {
|
||||
const { videoId } = event.payload;
|
||||
|
||||
// Submit video for processing
|
||||
const jobId = await step.do('submit video', async () => {
|
||||
const response = await fetch('https://processor.example.com/jobs', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ videoId })
|
||||
});
|
||||
const data = await response.json();
|
||||
return data.jobId;
|
||||
});
|
||||
|
||||
// Poll for completion with exponential backoff
|
||||
let complete = false;
|
||||
let attempt = 0;
|
||||
const maxAttempts = 20;
|
||||
|
||||
while (!complete && attempt < maxAttempts) {
|
||||
// Wait with exponential backoff: 10s, 20s, 40s, ...
|
||||
const delay = Math.min(10 * Math.pow(2, attempt), 300); // Max 5 minutes
|
||||
await step.sleep(`wait attempt ${attempt}`, `${delay} seconds`);
|
||||
|
||||
const status = await step.do(`check status attempt ${attempt}`, async () => {
|
||||
const response = await fetch(
|
||||
`https://processor.example.com/jobs/${jobId}/status`
|
||||
);
|
||||
return await response.json();
|
||||
});
|
||||
|
||||
if (status.state === 'complete') {
|
||||
complete = true;
|
||||
} else if (status.state === 'failed') {
|
||||
throw new NonRetryableError(`Processing failed: ${status.error}`);
|
||||
}
|
||||
|
||||
attempt++;
|
||||
}
|
||||
|
||||
if (!complete) {
|
||||
throw new Error('Processing timeout after maximum attempts');
|
||||
}
|
||||
|
||||
return { videoId, jobId, status: 'complete' };
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Pattern 2: Progress Tracking
|
||||
|
||||
```typescript
|
||||
export class DataMigrationWorkflow extends WorkflowEntrypoint<Env, MigrationParams> {
|
||||
async run(event: WorkflowEvent<MigrationParams>, step: WorkflowStep) {
|
||||
const { totalRecords, batchSize } = event.payload;
|
||||
const batches = Math.ceil(totalRecords / batchSize);
|
||||
|
||||
for (let i = 0; i < batches; i++) {
|
||||
const progress = await step.do(`migrate batch ${i}`, async () => {
|
||||
const offset = i * batchSize;
|
||||
|
||||
// Migrate batch
|
||||
await this.migrateBatch(offset, batchSize);
|
||||
|
||||
// Update progress in DB
|
||||
const percentage = Math.round(((i + 1) / batches) * 100);
|
||||
await this.env.DB.prepare(`
|
||||
UPDATE migration_jobs
|
||||
SET progress = ?, updated_at = ?
|
||||
WHERE id = ?
|
||||
`).bind(percentage, new Date().toISOString(), event.payload.jobId).run();
|
||||
|
||||
return { batch: i + 1, total: batches, percentage };
|
||||
});
|
||||
|
||||
console.log(`Progress: ${progress.percentage}%`);
|
||||
|
||||
// Small delay between batches to avoid overwhelming database
|
||||
if (i < batches - 1) {
|
||||
await step.sleep(`pause before batch ${i + 1}`, '1 second');
|
||||
}
|
||||
}
|
||||
|
||||
return { status: 'complete', batches };
|
||||
}
|
||||
|
||||
private async migrateBatch(offset: number, limit: number) {
|
||||
// Migration logic
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Human-in-the-Loop Patterns
|
||||
|
||||
### Pattern 1: Approval with Timeout and Escalation
|
||||
|
||||
```typescript
|
||||
export class ApprovalWorkflow extends WorkflowEntrypoint<Env, ApprovalParams> {
|
||||
async run(event: WorkflowEvent<ApprovalParams>, step: WorkflowStep) {
|
||||
const { requestId, amount } = event.payload;
|
||||
|
||||
// Send to primary approver
|
||||
await step.do('notify primary approver', async () => {
|
||||
await this.sendApprovalRequest('manager@example.com', requestId);
|
||||
});
|
||||
|
||||
// Wait 48 hours for approval
|
||||
let approved: boolean;
|
||||
let approver: string;
|
||||
|
||||
try {
|
||||
const decision = await step.waitForEvent<ApprovalEvent>(
|
||||
'wait for primary approval',
|
||||
{ type: 'approval-decision', timeout: '48 hours' }
|
||||
);
|
||||
|
||||
approved = decision.approved;
|
||||
approver = decision.approverId;
|
||||
} catch (error) {
|
||||
// Timeout - escalate to senior manager
|
||||
console.log('Primary approval timeout, escalating');
|
||||
|
||||
await step.do('notify senior approver', async () => {
|
||||
await this.sendApprovalRequest('senior-manager@example.com', requestId);
|
||||
});
|
||||
|
||||
// Wait another 24 hours
|
||||
const escalatedDecision = await step.waitForEvent<ApprovalEvent>(
|
||||
'wait for escalated approval',
|
||||
{ type: 'approval-decision', timeout: '24 hours' }
|
||||
);
|
||||
|
||||
approved = escalatedDecision.approved;
|
||||
approver = escalatedDecision.approverId;
|
||||
}
|
||||
|
||||
if (approved) {
|
||||
await step.do('execute approved action', async () => {
|
||||
// Execute the action
|
||||
});
|
||||
}
|
||||
|
||||
return { requestId, approved, approver };
|
||||
}
|
||||
|
||||
private async sendApprovalRequest(to: string, requestId: string) {
|
||||
// Send notification
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Workflow Chaining Patterns
|
||||
|
||||
### Pattern 1: Parent-Child Workflows
|
||||
|
||||
```typescript
|
||||
export class OrderWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
|
||||
async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
|
||||
const { orderId } = event.payload;
|
||||
|
||||
// Step 1: Process payment (separate workflow)
|
||||
const paymentWorkflow = await step.do('start payment workflow', async () => {
|
||||
const instance = await this.env.PAYMENT_WORKFLOW.create({
|
||||
params: { orderId, amount: event.payload.amount }
|
||||
});
|
||||
return { instanceId: instance.id };
|
||||
});
|
||||
|
||||
// Step 2: Wait for payment to complete
|
||||
let paymentComplete = false;
|
||||
|
||||
while (!paymentComplete) {
|
||||
await step.sleep('wait for payment', '30 seconds');
|
||||
|
||||
const paymentStatus = await step.do('check payment status', async () => {
|
||||
const instance = await this.env.PAYMENT_WORKFLOW.get(
|
||||
paymentWorkflow.instanceId
|
||||
);
|
||||
return await instance.status();
|
||||
});
|
||||
|
||||
if (paymentStatus.status === 'complete') {
|
||||
paymentComplete = true;
|
||||
} else if (paymentStatus.status === 'errored') {
|
||||
throw new Error(`Payment failed: ${paymentStatus.error}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: Start fulfillment workflow
|
||||
await step.do('start fulfillment workflow', async () => {
|
||||
await this.env.FULFILLMENT_WORKFLOW.create({
|
||||
params: { orderId }
|
||||
});
|
||||
});
|
||||
|
||||
return { orderId, status: 'processing' };
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Testing Patterns
|
||||
|
||||
### Pattern 1: Mock External APIs
|
||||
|
||||
```typescript
|
||||
import { describe, it, expect, beforeEach } from 'vitest';
|
||||
import { unstable_dev } from 'wrangler';
|
||||
|
||||
describe('PaymentWorkflow', () => {
|
||||
let worker;
|
||||
|
||||
beforeEach(async () => {
|
||||
worker = await unstable_dev('src/index.ts', {
|
||||
experimental: { disableExperimentalWarning: true }
|
||||
});
|
||||
});
|
||||
|
||||
it('should process payment successfully', async () => {
|
||||
// Mock fetch to return success
|
||||
globalThis.fetch = async (url: string) => {
|
||||
if (url.includes('payment-gateway')) {
|
||||
return new Response(JSON.stringify({
|
||||
transactionId: 'TXN-123',
|
||||
status: 'success'
|
||||
}));
|
||||
}
|
||||
return new Response('Not found', { status: 404 });
|
||||
};
|
||||
|
||||
const response = await worker.fetch('/workflows/create', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({
|
||||
orderId: 'ORD-123',
|
||||
amount: 99.99
|
||||
})
|
||||
});
|
||||
|
||||
const data = await response.json();
|
||||
expect(data.id).toBeDefined();
|
||||
});
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Monitoring Patterns
|
||||
|
||||
### Pattern 1: Structured Logging
|
||||
|
||||
```typescript
|
||||
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
|
||||
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
|
||||
this.log('info', 'Workflow started', {
|
||||
instanceId: event.instanceId,
|
||||
params: event.payload
|
||||
});
|
||||
|
||||
try {
|
||||
await step.do('process data', async () => {
|
||||
this.log('info', 'Processing data', { userId: event.payload.userId });
|
||||
// Process
|
||||
return { processed: true };
|
||||
});
|
||||
|
||||
this.log('info', 'Workflow completed successfully', {
|
||||
instanceId: event.instanceId
|
||||
});
|
||||
} catch (error) {
|
||||
this.log('error', 'Workflow failed', {
|
||||
instanceId: event.instanceId,
|
||||
error: error instanceof Error ? error.message : 'Unknown error'
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private log(level: string, message: string, data: any) {
|
||||
console.log(JSON.stringify({
|
||||
level,
|
||||
message,
|
||||
timestamp: new Date().toISOString(),
|
||||
...data
|
||||
}));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Pattern 2: Metrics Tracking
|
||||
|
||||
```typescript
|
||||
await step.do('track metrics', async () => {
|
||||
const metrics = {
|
||||
workflowId: event.instanceId,
|
||||
stepName: 'payment-processing',
|
||||
duration: performance.now() - startTime,
|
||||
status: 'success',
|
||||
timestamp: new Date().toISOString()
|
||||
};
|
||||
|
||||
// Store in Analytics Engine
|
||||
await this.env.ANALYTICS.writeDataPoint(metrics);
|
||||
|
||||
return metrics;
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Best Practices Summary
|
||||
|
||||
### Always Do
|
||||
|
||||
1. **Use idempotency keys** for external API calls
|
||||
2. **Categorize errors** - retry on transient failures, fail fast on terminal errors
|
||||
3. **Log structured data** - JSON logs for easy querying
|
||||
4. **Track progress** - update database for long-running processes
|
||||
5. **Use exponential backoff** - for polling and retries
|
||||
6. **Test workflows** - unit tests with mocked dependencies
|
||||
7. **Monitor metrics** - track success rates, durations, errors
|
||||
|
||||
### Never Do
|
||||
|
||||
1. **Don't retry non-idempotent operations infinitely** - use retry limits
|
||||
2. **Don't ignore timeout errors** - handle gracefully with fallbacks
|
||||
3. **Don't block on external events without timeout** - always set timeout
|
||||
4. **Don't assume steps execute in order** - each step is independent
|
||||
5. **Don't return non-serializable values** - only JSON-compatible types
|
||||
6. **Don't store sensitive data in workflow state** - use KV/D1 instead
|
||||
7. **Don't forget to clean up resources** - terminate unused workflow instances
|
||||
|
||||
---
|
||||
|
||||
**Last Updated**: 2025-10-22
|
||||
**Maintainer**: Jeremy Dawes | jeremy@jezweb.net
|
||||
136
templates/basic-workflow.ts
Normal file
136
templates/basic-workflow.ts
Normal file
@@ -0,0 +1,136 @@
|
||||
/**
|
||||
* Basic Cloudflare Workflow Example
|
||||
*
|
||||
* Demonstrates:
|
||||
* - WorkflowEntrypoint class
|
||||
* - step.do() for executing work
|
||||
* - step.sleep() for delays
|
||||
* - Accessing environment bindings
|
||||
* - Returning state from workflow
|
||||
*/
|
||||
|
||||
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
|
||||
|
||||
// Define environment bindings
|
||||
type Env = {
|
||||
MY_WORKFLOW: Workflow;
|
||||
// Add your bindings here:
|
||||
// MY_KV: KVNamespace;
|
||||
// DB: D1Database;
|
||||
// MY_BUCKET: R2Bucket;
|
||||
};
|
||||
|
||||
// Define workflow parameters
|
||||
type Params = {
|
||||
userId: string;
|
||||
email: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Basic Workflow
|
||||
*
|
||||
* Three-step workflow that:
|
||||
* 1. Fetches user data
|
||||
* 2. Processes user data
|
||||
* 3. Sends notification
|
||||
*/
|
||||
export class BasicWorkflow extends WorkflowEntrypoint<Env, Params> {
|
||||
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
|
||||
// Access parameters from event.payload
|
||||
const { userId, email } = event.payload;
|
||||
|
||||
console.log(`Starting workflow for user ${userId}`);
|
||||
|
||||
// Step 1: Fetch user data
|
||||
const userData = await step.do('fetch user data', async () => {
|
||||
// Example: Fetch from external API
|
||||
const response = await fetch(`https://api.example.com/users/${userId}`);
|
||||
const data = await response.json();
|
||||
|
||||
return {
|
||||
id: data.id,
|
||||
name: data.name,
|
||||
email: data.email,
|
||||
preferences: data.preferences
|
||||
};
|
||||
});
|
||||
|
||||
console.log(`Fetched user: ${userData.name}`);
|
||||
|
||||
// Step 2: Process user data
|
||||
const processedData = await step.do('process user data', async () => {
|
||||
// Example: Perform some computation
|
||||
return {
|
||||
userId: userData.id,
|
||||
processedAt: new Date().toISOString(),
|
||||
status: 'processed'
|
||||
};
|
||||
});
|
||||
|
||||
// Step 3: Wait before sending notification
|
||||
await step.sleep('wait before notification', '5 minutes');
|
||||
|
||||
// Step 4: Send notification
|
||||
await step.do('send notification', async () => {
|
||||
// Example: Send email or push notification
|
||||
await fetch('https://api.example.com/notifications', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
to: email,
|
||||
subject: 'Processing Complete',
|
||||
body: `Your data has been processed at ${processedData.processedAt}`
|
||||
})
|
||||
});
|
||||
|
||||
return { sent: true, timestamp: Date.now() };
|
||||
});
|
||||
|
||||
// Return final state (must be serializable)
|
||||
return {
|
||||
userId,
|
||||
status: 'complete',
|
||||
processedAt: processedData.processedAt
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker that triggers the workflow
|
||||
*/
|
||||
export default {
|
||||
async fetch(req: Request, env: Env): Promise<Response> {
|
||||
const url = new URL(req.url);
|
||||
|
||||
// Handle favicon
|
||||
if (url.pathname.startsWith('/favicon')) {
|
||||
return Response.json({}, { status: 404 });
|
||||
}
|
||||
|
||||
// Get instance status if ID provided
|
||||
const instanceId = url.searchParams.get('instanceId');
|
||||
if (instanceId) {
|
||||
const instance = await env.MY_WORKFLOW.get(instanceId);
|
||||
const status = await instance.status();
|
||||
|
||||
return Response.json({
|
||||
id: instanceId,
|
||||
status
|
||||
});
|
||||
}
|
||||
|
||||
// Create new workflow instance
|
||||
const instance = await env.MY_WORKFLOW.create({
|
||||
params: {
|
||||
userId: '123',
|
||||
email: 'user@example.com'
|
||||
}
|
||||
});
|
||||
|
||||
return Response.json({
|
||||
id: instance.id,
|
||||
details: await instance.status(),
|
||||
statusUrl: `${url.origin}?instanceId=${instance.id}`
|
||||
});
|
||||
}
|
||||
};
|
||||
252
templates/scheduled-workflow.ts
Normal file
252
templates/scheduled-workflow.ts
Normal file
@@ -0,0 +1,252 @@
|
||||
/**
|
||||
* Scheduled Workflow Example
|
||||
*
|
||||
* Demonstrates:
|
||||
* - step.sleep() for relative delays
|
||||
* - step.sleepUntil() for absolute times
|
||||
* - Scheduling daily/weekly tasks
|
||||
* - Long-running processes
|
||||
*/
|
||||
|
||||
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
|
||||
|
||||
type Env = {
|
||||
MY_WORKFLOW: Workflow;
|
||||
DB: D1Database;
|
||||
};
|
||||
|
||||
type ReportParams = {
|
||||
reportType: 'daily' | 'weekly' | 'monthly';
|
||||
recipients: string[];
|
||||
};
|
||||
|
||||
/**
|
||||
* Scheduled Reporting Workflow
|
||||
*
|
||||
* Generates and sends reports on a schedule:
|
||||
* - Daily reports at 9am UTC
|
||||
* - Weekly reports on Monday 9am UTC
|
||||
* - Monthly reports on 1st of month 9am UTC
|
||||
*/
|
||||
export class ScheduledReportWorkflow extends WorkflowEntrypoint<Env, ReportParams> {
|
||||
async run(event: WorkflowEvent<ReportParams>, step: WorkflowStep) {
|
||||
const { reportType, recipients } = event.payload;
|
||||
|
||||
if (reportType === 'daily') {
|
||||
await this.runDailyReport(step, recipients);
|
||||
} else if (reportType === 'weekly') {
|
||||
await this.runWeeklyReport(step, recipients);
|
||||
} else if (reportType === 'monthly') {
|
||||
await this.runMonthlyReport(step, recipients);
|
||||
}
|
||||
|
||||
return { reportType, status: 'complete' };
|
||||
}
|
||||
|
||||
/**
|
||||
* Daily report - runs every day at 9am UTC
|
||||
*/
|
||||
private async runDailyReport(step: WorkflowStep, recipients: string[]) {
|
||||
// Calculate next 9am UTC
|
||||
const now = new Date();
|
||||
const next9am = new Date();
|
||||
next9am.setUTCDate(next9am.getUTCDate() + 1);
|
||||
next9am.setUTCHours(9, 0, 0, 0);
|
||||
|
||||
// Sleep until tomorrow 9am
|
||||
await step.sleepUntil('wait until 9am tomorrow', next9am);
|
||||
|
||||
// Generate report
|
||||
const report = await step.do('generate daily report', async () => {
|
||||
const yesterday = new Date(now);
|
||||
yesterday.setDate(yesterday.getDate() - 1);
|
||||
const dateStr = yesterday.toISOString().split('T')[0];
|
||||
|
||||
const results = await this.env.DB.prepare(
|
||||
'SELECT * FROM daily_metrics WHERE date = ?'
|
||||
).bind(dateStr).all();
|
||||
|
||||
return {
|
||||
date: dateStr,
|
||||
type: 'daily',
|
||||
metrics: results.results
|
||||
};
|
||||
});
|
||||
|
||||
// Send report
|
||||
await step.do('send daily report', async () => {
|
||||
await this.sendReport(report, recipients);
|
||||
return { sent: true };
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Weekly report - runs every Monday at 9am UTC
|
||||
*/
|
||||
private async runWeeklyReport(step: WorkflowStep, recipients: string[]) {
|
||||
// Calculate next Monday 9am UTC
|
||||
const nextMonday = new Date();
|
||||
const daysUntilMonday = (1 + 7 - nextMonday.getDay()) % 7 || 7;
|
||||
nextMonday.setDate(nextMonday.getDate() + daysUntilMonday);
|
||||
nextMonday.setUTCHours(9, 0, 0, 0);
|
||||
|
||||
await step.sleepUntil('wait until Monday 9am', nextMonday);
|
||||
|
||||
// Generate report
|
||||
const report = await step.do('generate weekly report', async () => {
|
||||
const lastWeek = new Date();
|
||||
lastWeek.setDate(lastWeek.getDate() - 7);
|
||||
|
||||
const results = await this.env.DB.prepare(
|
||||
'SELECT * FROM daily_metrics WHERE date >= ? ORDER BY date DESC'
|
||||
).bind(lastWeek.toISOString().split('T')[0]).all();
|
||||
|
||||
return {
|
||||
weekStart: lastWeek.toISOString().split('T')[0],
|
||||
type: 'weekly',
|
||||
metrics: results.results
|
||||
};
|
||||
});
|
||||
|
||||
// Send report
|
||||
await step.do('send weekly report', async () => {
|
||||
await this.sendReport(report, recipients);
|
||||
return { sent: true };
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Monthly report - runs on 1st of each month at 9am UTC
|
||||
*/
|
||||
private async runMonthlyReport(step: WorkflowStep, recipients: string[]) {
|
||||
// Calculate first day of next month at 9am UTC
|
||||
const firstOfNextMonth = new Date();
|
||||
firstOfNextMonth.setUTCMonth(firstOfNextMonth.getUTCMonth() + 1, 1);
|
||||
firstOfNextMonth.setUTCHours(9, 0, 0, 0);
|
||||
|
||||
await step.sleepUntil('wait until 1st of month 9am', firstOfNextMonth);
|
||||
|
||||
// Generate report
|
||||
const report = await step.do('generate monthly report', async () => {
|
||||
const lastMonth = new Date();
|
||||
lastMonth.setMonth(lastMonth.getMonth() - 1);
|
||||
const monthStart = new Date(lastMonth.getFullYear(), lastMonth.getMonth(), 1);
|
||||
const monthEnd = new Date(lastMonth.getFullYear(), lastMonth.getMonth() + 1, 0);
|
||||
|
||||
const results = await this.env.DB.prepare(
|
||||
'SELECT * FROM daily_metrics WHERE date >= ? AND date <= ? ORDER BY date DESC'
|
||||
).bind(
|
||||
monthStart.toISOString().split('T')[0],
|
||||
monthEnd.toISOString().split('T')[0]
|
||||
).all();
|
||||
|
||||
return {
|
||||
month: lastMonth.toISOString().substring(0, 7), // YYYY-MM
|
||||
type: 'monthly',
|
||||
metrics: results.results
|
||||
};
|
||||
});
|
||||
|
||||
// Send report
|
||||
await step.do('send monthly report', async () => {
|
||||
await this.sendReport(report, recipients);
|
||||
return { sent: true };
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Send report via email
|
||||
*/
|
||||
private async sendReport(report: any, recipients: string[]) {
|
||||
const subject = `${report.type.charAt(0).toUpperCase() + report.type.slice(1)} Report`;
|
||||
|
||||
await fetch('https://api.example.com/send-email', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
to: recipients,
|
||||
subject,
|
||||
body: this.formatReport(report)
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Format report data as HTML
|
||||
*/
|
||||
private formatReport(report: any): string {
|
||||
// Format report metrics as HTML
|
||||
return `
|
||||
<h1>${report.type} Report</h1>
|
||||
<p>Period: ${report.date || report.weekStart || report.month}</p>
|
||||
<pre>${JSON.stringify(report.metrics, null, 2)}</pre>
|
||||
`;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Example: Reminder Workflow with Multiple Delays
|
||||
*/
|
||||
export class ReminderWorkflow extends WorkflowEntrypoint<Env, { userId: string; message: string }> {
|
||||
async run(event: WorkflowEvent<{ userId: string; message: string }>, step: WorkflowStep) {
|
||||
const { userId, message } = event.payload;
|
||||
|
||||
// Send initial reminder
|
||||
await step.do('send initial reminder', async () => {
|
||||
await this.sendReminder(userId, message);
|
||||
return { sent: true };
|
||||
});
|
||||
|
||||
// Wait 1 hour
|
||||
await step.sleep('wait 1 hour', '1 hour');
|
||||
|
||||
// Send second reminder
|
||||
await step.do('send second reminder', async () => {
|
||||
await this.sendReminder(userId, `Reminder: ${message}`);
|
||||
return { sent: true };
|
||||
});
|
||||
|
||||
// Wait 1 day
|
||||
await step.sleep('wait 1 day', '1 day');
|
||||
|
||||
// Send final reminder
|
||||
await step.do('send final reminder', async () => {
|
||||
await this.sendReminder(userId, `Final reminder: ${message}`);
|
||||
return { sent: true };
|
||||
});
|
||||
|
||||
return { userId, remindersSent: 3 };
|
||||
}
|
||||
|
||||
private async sendReminder(userId: string, message: string) {
|
||||
await fetch(`https://api.example.com/send-notification`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ userId, message })
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export default {
|
||||
async fetch(req: Request, env: Env): Promise<Response> {
|
||||
const url = new URL(req.url);
|
||||
|
||||
if (url.pathname.startsWith('/favicon')) {
|
||||
return Response.json({}, { status: 404 });
|
||||
}
|
||||
|
||||
// Create daily report workflow
|
||||
const instance = await env.MY_WORKFLOW.create({
|
||||
params: {
|
||||
reportType: 'daily',
|
||||
recipients: ['admin@example.com', 'team@example.com']
|
||||
}
|
||||
});
|
||||
|
||||
return Response.json({
|
||||
id: instance.id,
|
||||
status: await instance.status(),
|
||||
message: 'Daily report workflow scheduled'
|
||||
});
|
||||
}
|
||||
};
|
||||
287
templates/worker-trigger.ts
Normal file
287
templates/worker-trigger.ts
Normal file
@@ -0,0 +1,287 @@
|
||||
/**
|
||||
* Worker that Triggers and Manages Workflows
|
||||
*
|
||||
* Demonstrates:
|
||||
* - Creating workflow instances
|
||||
* - Querying workflow status
|
||||
* - Sending events to workflows
|
||||
* - Pausing/resuming workflows
|
||||
* - Terminating workflows
|
||||
*/
|
||||
|
||||
import { Hono } from 'hono';
|
||||
|
||||
type Bindings = {
|
||||
MY_WORKFLOW: Workflow;
|
||||
DB: D1Database;
|
||||
};
|
||||
|
||||
const app = new Hono<{ Bindings: Bindings }>();
|
||||
|
||||
/**
|
||||
* Create new workflow instance
|
||||
*/
|
||||
app.post('/workflows/create', async (c) => {
|
||||
const body = await c.req.json<{
|
||||
userId: string;
|
||||
email: string;
|
||||
[key: string]: any;
|
||||
}>();
|
||||
|
||||
try {
|
||||
// Create workflow instance with parameters
|
||||
const instance = await c.env.MY_WORKFLOW.create({
|
||||
params: body
|
||||
});
|
||||
|
||||
// Optionally store instance ID for later reference
|
||||
await c.env.DB.prepare(`
|
||||
INSERT INTO workflow_instances (id, user_id, status, created_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`).bind(
|
||||
instance.id,
|
||||
body.userId,
|
||||
'queued',
|
||||
new Date().toISOString()
|
||||
).run();
|
||||
|
||||
return c.json({
|
||||
id: instance.id,
|
||||
status: await instance.status(),
|
||||
createdAt: new Date().toISOString()
|
||||
}, 201);
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to create workflow',
|
||||
message: error instanceof Error ? error.message : 'Unknown error'
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Get workflow instance status
|
||||
*/
|
||||
app.get('/workflows/:id', async (c) => {
|
||||
const instanceId = c.req.param('id');
|
||||
|
||||
try {
|
||||
const instance = await c.env.MY_WORKFLOW.get(instanceId);
|
||||
const status = await instance.status();
|
||||
|
||||
return c.json({
|
||||
id: instanceId,
|
||||
...status
|
||||
});
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Workflow not found',
|
||||
message: error instanceof Error ? error.message : 'Unknown error'
|
||||
}, 404);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Send event to waiting workflow
|
||||
*/
|
||||
app.post('/workflows/:id/events', async (c) => {
|
||||
const instanceId = c.req.param('id');
|
||||
const body = await c.req.json<{
|
||||
type: string;
|
||||
payload: any;
|
||||
}>();
|
||||
|
||||
try {
|
||||
const instance = await c.env.MY_WORKFLOW.get(instanceId);
|
||||
|
||||
await instance.sendEvent({
|
||||
type: body.type,
|
||||
payload: body.payload
|
||||
});
|
||||
|
||||
return c.json({
|
||||
success: true,
|
||||
message: 'Event sent to workflow'
|
||||
});
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to send event',
|
||||
message: error instanceof Error ? error.message : 'Unknown error'
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Pause workflow instance
|
||||
*/
|
||||
app.post('/workflows/:id/pause', async (c) => {
|
||||
const instanceId = c.req.param('id');
|
||||
|
||||
try {
|
||||
const instance = await c.env.MY_WORKFLOW.get(instanceId);
|
||||
await instance.pause();
|
||||
|
||||
// Update database
|
||||
await c.env.DB.prepare(`
|
||||
UPDATE workflow_instances SET status = ? WHERE id = ?
|
||||
`).bind('paused', instanceId).run();
|
||||
|
||||
return c.json({
|
||||
success: true,
|
||||
message: 'Workflow paused'
|
||||
});
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to pause workflow',
|
||||
message: error instanceof Error ? error.message : 'Unknown error'
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Resume paused workflow instance
|
||||
*/
|
||||
app.post('/workflows/:id/resume', async (c) => {
|
||||
const instanceId = c.req.param('id');
|
||||
|
||||
try {
|
||||
const instance = await c.env.MY_WORKFLOW.get(instanceId);
|
||||
await instance.resume();
|
||||
|
||||
// Update database
|
||||
await c.env.DB.prepare(`
|
||||
UPDATE workflow_instances SET status = ? WHERE id = ?
|
||||
`).bind('running', instanceId).run();
|
||||
|
||||
return c.json({
|
||||
success: true,
|
||||
message: 'Workflow resumed'
|
||||
});
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to resume workflow',
|
||||
message: error instanceof Error ? error.message : 'Unknown error'
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Terminate workflow instance
|
||||
*/
|
||||
app.post('/workflows/:id/terminate', async (c) => {
|
||||
const instanceId = c.req.param('id');
|
||||
|
||||
try {
|
||||
const instance = await c.env.MY_WORKFLOW.get(instanceId);
|
||||
await instance.terminate();
|
||||
|
||||
// Update database
|
||||
await c.env.DB.prepare(`
|
||||
UPDATE workflow_instances SET status = ? WHERE id = ?
|
||||
`).bind('terminated', instanceId).run();
|
||||
|
||||
return c.json({
|
||||
success: true,
|
||||
message: 'Workflow terminated'
|
||||
});
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to terminate workflow',
|
||||
message: error instanceof Error ? error.message : 'Unknown error'
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* List all workflow instances (with filtering)
|
||||
*/
|
||||
app.get('/workflows', async (c) => {
|
||||
const status = c.req.query('status');
|
||||
const userId = c.req.query('userId');
|
||||
const limit = parseInt(c.req.query('limit') || '20');
|
||||
const offset = parseInt(c.req.query('offset') || '0');
|
||||
|
||||
let query = 'SELECT * FROM workflow_instances WHERE 1=1';
|
||||
const params: any[] = [];
|
||||
|
||||
if (status) {
|
||||
query += ' AND status = ?';
|
||||
params.push(status);
|
||||
}
|
||||
|
||||
if (userId) {
|
||||
query += ' AND user_id = ?';
|
||||
params.push(userId);
|
||||
}
|
||||
|
||||
query += ' ORDER BY created_at DESC LIMIT ? OFFSET ?';
|
||||
params.push(limit, offset);
|
||||
|
||||
try {
|
||||
const results = await c.env.DB.prepare(query).bind(...params).all();
|
||||
|
||||
return c.json({
|
||||
workflows: results.results,
|
||||
limit,
|
||||
offset,
|
||||
total: results.results.length
|
||||
});
|
||||
} catch (error) {
|
||||
return c.json({
|
||||
error: 'Failed to list workflows',
|
||||
message: error instanceof Error ? error.message : 'Unknown error'
|
||||
}, 500);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Health check
|
||||
*/
|
||||
app.get('/health', (c) => {
|
||||
return c.json({
|
||||
status: 'ok',
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* API documentation
|
||||
*/
|
||||
app.get('/', (c) => {
|
||||
return c.json({
|
||||
name: 'Workflow Management API',
|
||||
version: '1.0.0',
|
||||
endpoints: {
|
||||
'POST /workflows/create': {
|
||||
description: 'Create new workflow instance',
|
||||
body: { userId: 'string', email: 'string', ...params: 'any' }
|
||||
},
|
||||
'GET /workflows/:id': {
|
||||
description: 'Get workflow status',
|
||||
params: { id: 'workflow instance ID' }
|
||||
},
|
||||
'POST /workflows/:id/events': {
|
||||
description: 'Send event to workflow',
|
||||
params: { id: 'workflow instance ID' },
|
||||
body: { type: 'string', payload: 'any' }
|
||||
},
|
||||
'POST /workflows/:id/pause': {
|
||||
description: 'Pause workflow',
|
||||
params: { id: 'workflow instance ID' }
|
||||
},
|
||||
'POST /workflows/:id/resume': {
|
||||
description: 'Resume paused workflow',
|
||||
params: { id: 'workflow instance ID' }
|
||||
},
|
||||
'POST /workflows/:id/terminate': {
|
||||
description: 'Terminate workflow',
|
||||
params: { id: 'workflow instance ID' }
|
||||
},
|
||||
'GET /workflows': {
|
||||
description: 'List workflows',
|
||||
query: { status: 'string (optional)', userId: 'string (optional)', limit: 'number', offset: 'number' }
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
export default app;
|
||||
335
templates/workflow-with-events.ts
Normal file
335
templates/workflow-with-events.ts
Normal file
@@ -0,0 +1,335 @@
|
||||
/**
|
||||
* Event-Driven Workflow Example
|
||||
*
|
||||
* Demonstrates:
|
||||
* - step.waitForEvent() for external events
|
||||
* - instance.sendEvent() to trigger waiting workflows
|
||||
* - Timeout handling
|
||||
* - Human-in-the-loop patterns
|
||||
*/
|
||||
|
||||
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
|
||||
|
||||
type Env = {
|
||||
APPROVAL_WORKFLOW: Workflow;
|
||||
DB: D1Database;
|
||||
};
|
||||
|
||||
type ApprovalParams = {
|
||||
requestId: string;
|
||||
requesterId: string;
|
||||
amount: number;
|
||||
description: string;
|
||||
};
|
||||
|
||||
type ApprovalEvent = {
|
||||
approved: boolean;
|
||||
approverId: string;
|
||||
comments?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Approval Workflow with Event Waiting
|
||||
*
|
||||
* Flow:
|
||||
* 1. Create approval request
|
||||
* 2. Notify approvers
|
||||
* 3. Wait for approval decision (max 7 days)
|
||||
* 4. Process decision
|
||||
* 5. Execute approved action or reject
|
||||
*/
|
||||
export class ApprovalWorkflow extends WorkflowEntrypoint<Env, ApprovalParams> {
|
||||
async run(event: WorkflowEvent<ApprovalParams>, step: WorkflowStep) {
|
||||
const { requestId, requesterId, amount, description } = event.payload;
|
||||
|
||||
// Step 1: Create approval request in database
|
||||
await step.do('create approval request', async () => {
|
||||
await this.env.DB.prepare(`
|
||||
INSERT INTO approval_requests
|
||||
(id, requester_id, amount, description, status, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
`).bind(
|
||||
requestId,
|
||||
requesterId,
|
||||
amount,
|
||||
description,
|
||||
'pending',
|
||||
new Date().toISOString()
|
||||
).run();
|
||||
|
||||
return { created: true };
|
||||
});
|
||||
|
||||
// Step 2: Send notification to approvers
|
||||
await step.do('notify approvers', async () => {
|
||||
// Get list of approvers based on amount
|
||||
const approvers = amount > 10000
|
||||
? ['senior-manager@example.com', 'finance@example.com']
|
||||
: ['manager@example.com'];
|
||||
|
||||
// Send notification to each approver
|
||||
await fetch('https://api.example.com/send-notifications', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
recipients: approvers,
|
||||
subject: `Approval Required: ${description}`,
|
||||
body: `
|
||||
Request ID: ${requestId}
|
||||
Amount: $${amount}
|
||||
Description: ${description}
|
||||
Requester: ${requesterId}
|
||||
|
||||
Please review and approve/reject at:
|
||||
https://app.example.com/approvals/${requestId}
|
||||
`,
|
||||
data: {
|
||||
requestId,
|
||||
workflowInstanceId: event.instanceId // Store for sending event later
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
return { notified: true, approvers };
|
||||
});
|
||||
|
||||
// Step 3: Wait for approval decision (max 7 days)
|
||||
let approvalEvent: ApprovalEvent;
|
||||
|
||||
try {
|
||||
approvalEvent = await step.waitForEvent<ApprovalEvent>(
|
||||
'wait for approval decision',
|
||||
{
|
||||
type: 'approval-decision',
|
||||
timeout: '7 days' // Auto-reject after 7 days
|
||||
}
|
||||
);
|
||||
|
||||
console.log('Approval decision received:', approvalEvent);
|
||||
} catch (error) {
|
||||
// Timeout occurred - auto-reject
|
||||
console.log('Approval timeout - auto-rejecting');
|
||||
|
||||
await step.do('auto-reject due to timeout', async () => {
|
||||
await this.env.DB.prepare(`
|
||||
UPDATE approval_requests
|
||||
SET status = ?, updated_at = ?, rejection_reason = ?
|
||||
WHERE id = ?
|
||||
`).bind(
|
||||
'rejected',
|
||||
new Date().toISOString(),
|
||||
'Approval timeout - no response within 7 days',
|
||||
requestId
|
||||
).run();
|
||||
|
||||
// Notify requester
|
||||
await this.notifyRequester(requesterId, requestId, false, 'Approval timeout');
|
||||
|
||||
return { rejected: true, reason: 'timeout' };
|
||||
});
|
||||
|
||||
return {
|
||||
requestId,
|
||||
status: 'rejected',
|
||||
reason: 'timeout'
|
||||
};
|
||||
}
|
||||
|
||||
// Step 4: Process approval decision
|
||||
await step.do('process approval decision', async () => {
|
||||
await this.env.DB.prepare(`
|
||||
UPDATE approval_requests
|
||||
SET status = ?, approver_id = ?, comments = ?, updated_at = ?
|
||||
WHERE id = ?
|
||||
`).bind(
|
||||
approvalEvent.approved ? 'approved' : 'rejected',
|
||||
approvalEvent.approverId,
|
||||
approvalEvent.comments || null,
|
||||
new Date().toISOString(),
|
||||
requestId
|
||||
).run();
|
||||
|
||||
return { processed: true };
|
||||
});
|
||||
|
||||
// Step 5: Notify requester
|
||||
await step.do('notify requester', async () => {
|
||||
await this.notifyRequester(
|
||||
requesterId,
|
||||
requestId,
|
||||
approvalEvent.approved,
|
||||
approvalEvent.comments
|
||||
);
|
||||
|
||||
return { notified: true };
|
||||
});
|
||||
|
||||
// Step 6: Execute approved action if approved
|
||||
if (approvalEvent.approved) {
|
||||
await step.do('execute approved action', async () => {
|
||||
// Execute the action that was approved
|
||||
console.log(`Executing approved action for request ${requestId}`);
|
||||
|
||||
// Example: Process payment, create resource, etc.
|
||||
await fetch('https://api.example.com/execute-action', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
requestId,
|
||||
amount,
|
||||
description
|
||||
})
|
||||
});
|
||||
|
||||
return { executed: true };
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
requestId,
|
||||
status: approvalEvent.approved ? 'approved' : 'rejected',
|
||||
approver: approvalEvent.approverId
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Send notification to requester
|
||||
*/
|
||||
private async notifyRequester(
|
||||
requesterId: string,
|
||||
requestId: string,
|
||||
approved: boolean,
|
||||
comments?: string
|
||||
) {
|
||||
await fetch('https://api.example.com/send-notification', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
recipient: requesterId,
|
||||
subject: `Request ${requestId} ${approved ? 'Approved' : 'Rejected'}`,
|
||||
body: `
|
||||
Your request ${requestId} has been ${approved ? 'approved' : 'rejected'}.
|
||||
${comments ? `\n\nComments: ${comments}` : ''}
|
||||
`
|
||||
})
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker that handles:
|
||||
* 1. Creating approval workflows
|
||||
* 2. Receiving approval decisions via webhook
|
||||
* 3. Sending events to waiting workflows
|
||||
*/
|
||||
export default {
|
||||
async fetch(req: Request, env: Env): Promise<Response> {
|
||||
const url = new URL(req.url);
|
||||
|
||||
if (url.pathname.startsWith('/favicon')) {
|
||||
return Response.json({}, { status: 404 });
|
||||
}
|
||||
|
||||
// Endpoint: Create new approval request
|
||||
if (url.pathname === '/approvals/create' && req.method === 'POST') {
|
||||
const body = await req.json<ApprovalParams>();
|
||||
|
||||
// Create workflow instance
|
||||
const instance = await env.APPROVAL_WORKFLOW.create({
|
||||
params: body
|
||||
});
|
||||
|
||||
// Store instance ID for later (when approval decision comes in)
|
||||
// In production, store this in DB/KV
|
||||
await env.DB.prepare(`
|
||||
UPDATE approval_requests
|
||||
SET workflow_instance_id = ?
|
||||
WHERE id = ?
|
||||
`).bind(instance.id, body.requestId).run();
|
||||
|
||||
return Response.json({
|
||||
id: instance.id,
|
||||
requestId: body.requestId,
|
||||
status: await instance.status()
|
||||
});
|
||||
}
|
||||
|
||||
// Endpoint: Submit approval decision (webhook from approval UI)
|
||||
if (url.pathname === '/approvals/decide' && req.method === 'POST') {
|
||||
const body = await req.json<{
|
||||
requestId: string;
|
||||
approved: boolean;
|
||||
approverId: string;
|
||||
comments?: string;
|
||||
}>();
|
||||
|
||||
// Get workflow instance ID from database
|
||||
const result = await env.DB.prepare(`
|
||||
SELECT workflow_instance_id
|
||||
FROM approval_requests
|
||||
WHERE id = ?
|
||||
`).bind(body.requestId).first<{ workflow_instance_id: string }>();
|
||||
|
||||
if (!result) {
|
||||
return Response.json(
|
||||
{ error: 'Request not found' },
|
||||
{ status: 404 }
|
||||
);
|
||||
}
|
||||
|
||||
// Get workflow instance
|
||||
const instance = await env.APPROVAL_WORKFLOW.get(result.workflow_instance_id);
|
||||
|
||||
// Send event to waiting workflow
|
||||
await instance.sendEvent({
|
||||
type: 'approval-decision',
|
||||
payload: {
|
||||
approved: body.approved,
|
||||
approverId: body.approverId,
|
||||
comments: body.comments
|
||||
}
|
||||
});
|
||||
|
||||
return Response.json({
|
||||
success: true,
|
||||
message: 'Approval decision sent to workflow'
|
||||
});
|
||||
}
|
||||
|
||||
// Endpoint: Get approval status
|
||||
if (url.pathname.startsWith('/approvals/') && req.method === 'GET') {
|
||||
const requestId = url.pathname.split('/')[2];
|
||||
|
||||
const result = await env.DB.prepare(`
|
||||
SELECT workflow_instance_id, status
|
||||
FROM approval_requests
|
||||
WHERE id = ?
|
||||
`).bind(requestId).first<{ workflow_instance_id: string; status: string }>();
|
||||
|
||||
if (!result) {
|
||||
return Response.json(
|
||||
{ error: 'Request not found' },
|
||||
{ status: 404 }
|
||||
);
|
||||
}
|
||||
|
||||
const instance = await env.APPROVAL_WORKFLOW.get(result.workflow_instance_id);
|
||||
const workflowStatus = await instance.status();
|
||||
|
||||
return Response.json({
|
||||
requestId,
|
||||
dbStatus: result.status,
|
||||
workflowStatus
|
||||
});
|
||||
}
|
||||
|
||||
// Default: Show usage
|
||||
return Response.json({
|
||||
endpoints: {
|
||||
'POST /approvals/create': 'Create approval request',
|
||||
'POST /approvals/decide': 'Submit approval decision',
|
||||
'GET /approvals/:id': 'Get approval status'
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
235
templates/workflow-with-retries.ts
Normal file
235
templates/workflow-with-retries.ts
Normal file
@@ -0,0 +1,235 @@
|
||||
/**
|
||||
* Workflow with Advanced Retry Configuration
|
||||
*
|
||||
* Demonstrates:
|
||||
* - Custom retry limits
|
||||
* - Exponential, linear, and constant backoff
|
||||
* - Step timeouts
|
||||
* - NonRetryableError for terminal failures
|
||||
* - Error handling with try-catch
|
||||
*/
|
||||
|
||||
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
|
||||
import { NonRetryableError } from 'cloudflare:workflows';
|
||||
|
||||
type Env = {
|
||||
MY_WORKFLOW: Workflow;
|
||||
};
|
||||
|
||||
type PaymentParams = {
|
||||
orderId: string;
|
||||
amount: number;
|
||||
customerId: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Payment Processing Workflow with Retries
|
||||
*
|
||||
* Handles payment processing with:
|
||||
* - Validation with NonRetryableError
|
||||
* - Retry logic for payment gateway
|
||||
* - Fallback to backup gateway
|
||||
* - Graceful error handling
|
||||
*/
|
||||
export class PaymentWorkflow extends WorkflowEntrypoint<Env, PaymentParams> {
|
||||
async run(event: WorkflowEvent<PaymentParams>, step: WorkflowStep) {
|
||||
const { orderId, amount, customerId } = event.payload;
|
||||
|
||||
// Step 1: Validate input (no retries - fail fast)
|
||||
await step.do(
|
||||
'validate payment request',
|
||||
{
|
||||
retries: {
|
||||
limit: 0 // No retries for validation
|
||||
}
|
||||
},
|
||||
async () => {
|
||||
if (!orderId || !customerId) {
|
||||
throw new NonRetryableError('Missing required fields: orderId or customerId');
|
||||
}
|
||||
|
||||
if (amount <= 0) {
|
||||
throw new NonRetryableError(`Invalid amount: ${amount}`);
|
||||
}
|
||||
|
||||
if (amount > 100000) {
|
||||
throw new NonRetryableError(`Amount exceeds limit: ${amount}`);
|
||||
}
|
||||
|
||||
return { valid: true };
|
||||
}
|
||||
);
|
||||
|
||||
// Step 2: Call primary payment gateway (exponential backoff)
|
||||
let paymentResult;
|
||||
|
||||
try {
|
||||
paymentResult = await step.do(
|
||||
'charge primary payment gateway',
|
||||
{
|
||||
retries: {
|
||||
limit: 5, // Max 5 retry attempts
|
||||
delay: '10 seconds', // Start at 10 seconds
|
||||
backoff: 'exponential' // 10s, 20s, 40s, 80s, 160s
|
||||
},
|
||||
timeout: '2 minutes' // Each attempt times out after 2 minutes
|
||||
},
|
||||
async () => {
|
||||
const response = await fetch('https://primary-payment-gateway.example.com/charge', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
orderId,
|
||||
amount,
|
||||
customerId
|
||||
})
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
// Check if error is retryable
|
||||
if (response.status === 401 || response.status === 403) {
|
||||
throw new NonRetryableError('Authentication failed with payment gateway');
|
||||
}
|
||||
|
||||
throw new Error(`Payment gateway error: ${response.status}`);
|
||||
}
|
||||
|
||||
const data = await response.json();
|
||||
return {
|
||||
transactionId: data.transactionId,
|
||||
status: data.status,
|
||||
gateway: 'primary'
|
||||
};
|
||||
}
|
||||
);
|
||||
} catch (error) {
|
||||
console.error('Primary gateway failed:', error);
|
||||
|
||||
// Step 3: Fallback to backup gateway (linear backoff)
|
||||
paymentResult = await step.do(
|
||||
'charge backup payment gateway',
|
||||
{
|
||||
retries: {
|
||||
limit: 3,
|
||||
delay: '30 seconds',
|
||||
backoff: 'linear' // 30s, 60s, 90s
|
||||
},
|
||||
timeout: '3 minutes'
|
||||
},
|
||||
async () => {
|
||||
const response = await fetch('https://backup-payment-gateway.example.com/charge', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
orderId,
|
||||
amount,
|
||||
customerId
|
||||
})
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Backup gateway error: ${response.status}`);
|
||||
}
|
||||
|
||||
const data = await response.json();
|
||||
return {
|
||||
transactionId: data.transactionId,
|
||||
status: data.status,
|
||||
gateway: 'backup'
|
||||
};
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Step 4: Update order status (constant backoff)
|
||||
await step.do(
|
||||
'update order status',
|
||||
{
|
||||
retries: {
|
||||
limit: 10,
|
||||
delay: '5 seconds',
|
||||
backoff: 'constant' // Always 5 seconds between retries
|
||||
},
|
||||
timeout: '30 seconds'
|
||||
},
|
||||
async () => {
|
||||
const response = await fetch(`https://api.example.com/orders/${orderId}`, {
|
||||
method: 'PATCH',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
status: 'paid',
|
||||
transactionId: paymentResult.transactionId,
|
||||
gateway: paymentResult.gateway
|
||||
})
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to update order status');
|
||||
}
|
||||
|
||||
return { updated: true };
|
||||
}
|
||||
);
|
||||
|
||||
// Step 5: Send confirmation (optional - don't fail workflow if this fails)
|
||||
try {
|
||||
await step.do(
|
||||
'send payment confirmation',
|
||||
{
|
||||
retries: {
|
||||
limit: 3,
|
||||
delay: '10 seconds',
|
||||
backoff: 'exponential'
|
||||
}
|
||||
},
|
||||
async () => {
|
||||
await fetch('https://api.example.com/notifications/payment-confirmed', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
orderId,
|
||||
customerId,
|
||||
amount
|
||||
})
|
||||
});
|
||||
|
||||
return { sent: true };
|
||||
}
|
||||
);
|
||||
} catch (error) {
|
||||
// Log but don't fail workflow
|
||||
console.error('Failed to send confirmation:', error);
|
||||
}
|
||||
|
||||
return {
|
||||
orderId,
|
||||
transactionId: paymentResult.transactionId,
|
||||
gateway: paymentResult.gateway,
|
||||
status: 'complete'
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export default {
|
||||
async fetch(req: Request, env: Env): Promise<Response> {
|
||||
const url = new URL(req.url);
|
||||
|
||||
if (url.pathname.startsWith('/favicon')) {
|
||||
return Response.json({}, { status: 404 });
|
||||
}
|
||||
|
||||
// Create payment workflow
|
||||
const instance = await env.MY_WORKFLOW.create({
|
||||
params: {
|
||||
orderId: 'ORD-' + Date.now(),
|
||||
amount: 99.99,
|
||||
customerId: 'CUST-123'
|
||||
}
|
||||
});
|
||||
|
||||
return Response.json({
|
||||
id: instance.id,
|
||||
status: await instance.status()
|
||||
});
|
||||
}
|
||||
};
|
||||
171
templates/wrangler-workflows-config.jsonc
Normal file
171
templates/wrangler-workflows-config.jsonc
Normal file
@@ -0,0 +1,171 @@
|
||||
/**
|
||||
* Complete Wrangler Configuration for Workflows
|
||||
*
|
||||
* This file shows all configuration options for Cloudflare Workflows.
|
||||
* Copy and adapt to your project's needs.
|
||||
*/
|
||||
{
|
||||
"$schema": "node_modules/wrangler/config-schema.json",
|
||||
"name": "my-workflow-project",
|
||||
"main": "src/index.ts",
|
||||
"account_id": "YOUR_ACCOUNT_ID",
|
||||
"compatibility_date": "2025-10-22",
|
||||
|
||||
/**
|
||||
* Workflows Configuration
|
||||
*
|
||||
* Define workflows that can be triggered from Workers.
|
||||
* Each workflow binding makes a workflow class available via env.BINDING_NAME
|
||||
*/
|
||||
"workflows": [
|
||||
{
|
||||
/**
|
||||
* name: The name of the workflow (used in dashboard, CLI commands)
|
||||
* This is the workflow identifier for wrangler commands like:
|
||||
* - npx wrangler workflows instances list my-workflow
|
||||
* - npx wrangler workflows instances describe my-workflow <id>
|
||||
*/
|
||||
"name": "my-workflow",
|
||||
|
||||
/**
|
||||
* binding: The name used in your code to access this workflow
|
||||
* Available in Workers as: env.MY_WORKFLOW
|
||||
*/
|
||||
"binding": "MY_WORKFLOW",
|
||||
|
||||
/**
|
||||
* class_name: The exported class name that extends WorkflowEntrypoint
|
||||
* Must match your TypeScript class:
|
||||
* export class MyWorkflow extends WorkflowEntrypoint { ... }
|
||||
*/
|
||||
"class_name": "MyWorkflow"
|
||||
},
|
||||
|
||||
/**
|
||||
* Example: Multiple workflows in one project
|
||||
*/
|
||||
{
|
||||
"name": "payment-workflow",
|
||||
"binding": "PAYMENT_WORKFLOW",
|
||||
"class_name": "PaymentWorkflow"
|
||||
},
|
||||
{
|
||||
"name": "approval-workflow",
|
||||
"binding": "APPROVAL_WORKFLOW",
|
||||
"class_name": "ApprovalWorkflow"
|
||||
}
|
||||
|
||||
/**
|
||||
* Example: Workflow in different Worker script
|
||||
*
|
||||
* If your workflow is defined in a separate Worker script,
|
||||
* use script_name to reference it
|
||||
*/
|
||||
// {
|
||||
// "name": "external-workflow",
|
||||
// "binding": "EXTERNAL_WORKFLOW",
|
||||
// "class_name": "ExternalWorkflow",
|
||||
// "script_name": "workflow-worker" // Name of Worker that contains the workflow
|
||||
// }
|
||||
],
|
||||
|
||||
/**
|
||||
* Optional: Other Cloudflare bindings
|
||||
*/
|
||||
|
||||
// KV Namespace
|
||||
// "kv_namespaces": [
|
||||
// {
|
||||
// "binding": "MY_KV",
|
||||
// "id": "YOUR_KV_ID"
|
||||
// }
|
||||
// ],
|
||||
|
||||
// D1 Database
|
||||
// "d1_databases": [
|
||||
// {
|
||||
// "binding": "DB",
|
||||
// "database_name": "my-database",
|
||||
// "database_id": "YOUR_DB_ID"
|
||||
// }
|
||||
// ],
|
||||
|
||||
// R2 Bucket
|
||||
// "r2_buckets": [
|
||||
// {
|
||||
// "binding": "MY_BUCKET",
|
||||
// "bucket_name": "my-bucket"
|
||||
// }
|
||||
// ],
|
||||
|
||||
// Queues
|
||||
// "queues": {
|
||||
// "producers": [
|
||||
// {
|
||||
// "binding": "MY_QUEUE",
|
||||
// "queue": "my-queue"
|
||||
// }
|
||||
// ]
|
||||
// },
|
||||
|
||||
/**
|
||||
* Optional: Environment variables
|
||||
*/
|
||||
// "vars": {
|
||||
// "ENVIRONMENT": "production",
|
||||
// "API_URL": "https://api.example.com"
|
||||
// },
|
||||
|
||||
/**
|
||||
* Optional: Observability
|
||||
*/
|
||||
"observability": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
/**
|
||||
* Optional: Multiple environments
|
||||
*/
|
||||
"env": {
|
||||
"staging": {
|
||||
"vars": {
|
||||
"ENVIRONMENT": "staging"
|
||||
},
|
||||
"workflows": [
|
||||
{
|
||||
"name": "my-workflow-staging",
|
||||
"binding": "MY_WORKFLOW",
|
||||
"class_name": "MyWorkflow"
|
||||
}
|
||||
]
|
||||
},
|
||||
"production": {
|
||||
"vars": {
|
||||
"ENVIRONMENT": "production"
|
||||
},
|
||||
"workflows": [
|
||||
{
|
||||
"name": "my-workflow-production",
|
||||
"binding": "MY_WORKFLOW",
|
||||
"class_name": "MyWorkflow"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deployment Commands:
|
||||
*
|
||||
* # Deploy to default (production)
|
||||
* npx wrangler deploy
|
||||
*
|
||||
* # Deploy to staging environment
|
||||
* npx wrangler deploy --env staging
|
||||
*
|
||||
* # List workflow instances
|
||||
* npx wrangler workflows instances list my-workflow
|
||||
*
|
||||
* # Get instance status
|
||||
* npx wrangler workflows instances describe my-workflow <instance-id>
|
||||
*/
|
||||
Reference in New Issue
Block a user