From ab158ea3e9eb9adbb549b391a3b9cce6a43cf9db Mon Sep 17 00:00:00 2001 From: Zhongwei Li Date: Sun, 30 Nov 2025 09:08:46 +0800 Subject: [PATCH] Initial commit --- .claude-plugin/plugin.json | 12 + README.md | 3 + plugin.lock.json | 68 ++ skills/aws-serverless-eda/SKILL.md | 747 ++++++++++++ .../references/deployment-best-practices.md | 830 ++++++++++++++ .../references/eda-patterns.md | 1002 +++++++++++++++++ .../observability-best-practices.md | 770 +++++++++++++ .../references/performance-optimization.md | 671 +++++++++++ .../references/security-best-practices.md | 625 ++++++++++ .../references/serverless-patterns.md | 838 ++++++++++++++ 10 files changed, 5566 insertions(+) create mode 100644 .claude-plugin/plugin.json create mode 100644 README.md create mode 100644 plugin.lock.json create mode 100644 skills/aws-serverless-eda/SKILL.md create mode 100644 skills/aws-serverless-eda/references/deployment-best-practices.md create mode 100644 skills/aws-serverless-eda/references/eda-patterns.md create mode 100644 skills/aws-serverless-eda/references/observability-best-practices.md create mode 100644 skills/aws-serverless-eda/references/performance-optimization.md create mode 100644 skills/aws-serverless-eda/references/security-best-practices.md create mode 100644 skills/aws-serverless-eda/references/serverless-patterns.md diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..335b711 --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,12 @@ +{ + "name": "serverless-eda", + "description": "AWS serverless and event-driven architecture best practices based on Well-Architected Framework with MCP servers for SAM, Lambda, Step Functions, and messaging", + "version": "0.0.0-2025.11.28", + "author": { + "name": "Kane Zhu", + "email": "me@kane.mx" + }, + "skills": [ + "./skills/aws-serverless-eda" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..c5801cc --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# serverless-eda + +AWS serverless and event-driven architecture best practices based on Well-Architected Framework with MCP servers for SAM, Lambda, Step Functions, and messaging diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..8ad36ae --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,68 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:zxkane/aws-skills:serverless-eda", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "ff0ed7dd84ee38c5963e2be6ddfd74065c81521b", + "treeHash": "9375638efaf61fa4e3870fbb415c277069d5ddac44feeb5c77f78f30ed9c22f7", + "generatedAt": "2025-11-28T10:29:14.139105Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "serverless-eda", + "description": "AWS serverless and event-driven architecture best practices based on Well-Architected Framework with MCP servers for SAM, Lambda, Step Functions, and messaging" + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "5a7d0d76f54cbae89f1e7147bd50b25ba333fb16f79afb0442f11d7673b1cf2a" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "18793e20fa1c2d078881402c0942a85186cd8e8c0604ce11318466a35e2d5292" + }, + { + "path": "skills/aws-serverless-eda/SKILL.md", + "sha256": "f008433c085a85dc0da9063a7e69545f91eebc1253a9018fe0950473b024a3cd" + }, + { + "path": "skills/aws-serverless-eda/references/observability-best-practices.md", + "sha256": "531977c659a774fec3dabd4c369789d5979bceb83cd4656bfe818929569c9e7a" + }, + { + "path": "skills/aws-serverless-eda/references/deployment-best-practices.md", + "sha256": "f94b18c62732f950a29b3e4cd11134ba397c2cfce4b3bb777ee729c17c9f1268" + }, + { + "path": "skills/aws-serverless-eda/references/security-best-practices.md", + "sha256": "511f6e0921f852947db893ca64c093e15f728f6b4e35c40423a50f1398278261" + }, + { + "path": "skills/aws-serverless-eda/references/serverless-patterns.md", + "sha256": "8b7408d9c98f8224290093acb831468b9a01bd8d17436e301d4383c18c556f2c" + }, + { + "path": "skills/aws-serverless-eda/references/eda-patterns.md", + "sha256": "c3518448e773c0e93d1a1f518d3d8d67475995bf3211ae1cf57cac46447ea6e1" + }, + { + "path": "skills/aws-serverless-eda/references/performance-optimization.md", + "sha256": "5086493fbeb4c97c1bc891484d6deecbfbc6d02a9422f6854dd8c7274320bfa6" + } + ], + "dirSha256": "9375638efaf61fa4e3870fbb415c277069d5ddac44feeb5c77f78f30ed9c22f7" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file diff --git a/skills/aws-serverless-eda/SKILL.md b/skills/aws-serverless-eda/SKILL.md new file mode 100644 index 0000000..84251a7 --- /dev/null +++ b/skills/aws-serverless-eda/SKILL.md @@ -0,0 +1,747 @@ +--- +name: aws-serverless-eda +description: AWS serverless and event-driven architecture expert based on Well-Architected Framework. Use when building serverless APIs, Lambda functions, REST APIs, microservices, or async workflows. Covers Lambda with TypeScript/Python, API Gateway (REST/HTTP), DynamoDB, Step Functions, EventBridge, SQS, SNS, and serverless patterns. Essential when user mentions serverless, Lambda, API Gateway, event-driven, async processing, queues, pub/sub, or wants to build scalable serverless applications with AWS best practices. +--- + +# AWS Serverless & Event-Driven Architecture + +This skill provides comprehensive guidance for building serverless applications and event-driven architectures on AWS based on Well-Architected Framework principles. + +## Integrated MCP Servers + +This skill includes 5 MCP servers for serverless development: + +### AWS Documentation MCP Server +**When to use**: Always verify AWS service information before implementation +- Search AWS documentation for latest features and best practices +- Check regional availability of AWS services +- Verify service limits and quotas +- Confirm API specifications and parameters +- Access up-to-date AWS service information + +### AWS Serverless MCP Server +**Purpose**: Complete serverless application lifecycle with SAM CLI +- Initialize new serverless applications +- Deploy serverless applications +- Test Lambda functions locally +- Generate SAM templates +- Manage serverless application lifecycle + +### AWS Lambda Tool MCP Server +**Purpose**: Execute Lambda functions as tools +- Invoke Lambda functions directly +- Test Lambda integrations +- Execute workflows requiring private resource access +- Run Lambda-based automation + +### AWS Step Functions MCP Server +**Purpose**: Execute complex workflows and orchestration +- Create and manage state machines +- Execute workflow orchestrations +- Handle distributed transactions +- Implement saga patterns +- Coordinate microservices + +### Amazon SNS/SQS MCP Server +**Purpose**: Event-driven messaging and queue management +- Publish messages to SNS topics +- Send/receive messages from SQS queues +- Manage event-driven communication +- Implement pub/sub patterns +- Handle asynchronous processing + +## When to Use This Skill + +Use this skill when: +- Building serverless applications with Lambda +- Designing event-driven architectures +- Implementing microservices patterns +- Creating asynchronous processing workflows +- Orchestrating multi-service transactions +- Building real-time data processing pipelines +- Implementing saga patterns for distributed transactions +- Designing for scale and resilience + +## AWS Well-Architected Serverless Design Principles + +### 1. Speedy, Simple, Singular + +**Functions should be concise and single-purpose** + +```typescript +// ✅ GOOD - Single purpose, focused function +export const processOrder = async (event: OrderEvent) => { + // Only handles order processing + const order = await validateOrder(event); + await saveOrder(order); + await publishOrderCreatedEvent(order); + return { statusCode: 200, body: JSON.stringify({ orderId: order.id }) }; +}; + +// ❌ BAD - Function does too much +export const handleEverything = async (event: any) => { + // Handles orders, inventory, payments, shipping... + // Too many responsibilities +}; +``` + +**Keep functions environmentally efficient and cost-aware**: +- Minimize cold start times +- Optimize memory allocation +- Use provisioned concurrency only when needed +- Leverage connection reuse + +### 2. Think Concurrent Requests, Not Total Requests + +**Design for concurrency, not volume** + +Lambda scales horizontally - design considerations should focus on: +- Concurrent execution limits +- Downstream service throttling +- Shared resource contention +- Connection pool sizing + +```typescript +// Consider concurrent Lambda executions accessing DynamoDB +const table = new dynamodb.Table(this, 'Table', { + billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, // Auto-scales with load +}); + +// Or with provisioned capacity + auto-scaling +const table = new dynamodb.Table(this, 'Table', { + billingMode: dynamodb.BillingMode.PROVISIONED, + readCapacity: 5, + writeCapacity: 5, +}); + +// Enable auto-scaling for concurrent load +table.autoScaleReadCapacity({ minCapacity: 5, maxCapacity: 100 }); +table.autoScaleWriteCapacity({ minCapacity: 5, maxCapacity: 100 }); +``` + +### 3. Share Nothing + +**Function runtime environments are short-lived** + +```typescript +// ❌ BAD - Relying on local file system +export const handler = async (event: any) => { + fs.writeFileSync('/tmp/data.json', JSON.stringify(data)); // Lost after execution +}; + +// ✅ GOOD - Use persistent storage +export const handler = async (event: any) => { + await s3.putObject({ + Bucket: process.env.BUCKET_NAME, + Key: 'data.json', + Body: JSON.stringify(data), + }); +}; +``` + +**State management**: +- Use DynamoDB for persistent state +- Use Step Functions for workflow state +- Use ElastiCache for session state +- Use S3 for file storage + +### 4. Assume No Hardware Affinity + +**Applications must be hardware-agnostic** + +Infrastructure can change without notice: +- Lambda functions can run on different hardware +- Container instances can be replaced +- No assumption about underlying infrastructure + +**Design for portability**: +- Use environment variables for configuration +- Avoid hardware-specific optimizations +- Test across different environments + +### 5. Orchestrate with State Machines, Not Function Chaining + +**Use Step Functions for orchestration** + +```typescript +// ❌ BAD - Lambda function chaining +export const handler1 = async (event: any) => { + const result = await processStep1(event); + await lambda.invoke({ + FunctionName: 'handler2', + Payload: JSON.stringify(result), + }); +}; + +// ✅ GOOD - Step Functions orchestration +const stateMachine = new stepfunctions.StateMachine(this, 'OrderWorkflow', { + definition: stepfunctions.Chain + .start(validateOrder) + .next(processPayment) + .next(shipOrder) + .next(sendConfirmation), +}); +``` + +**Benefits of Step Functions**: +- Visual workflow representation +- Built-in error handling and retries +- Execution history and debugging +- Parallel and sequential execution +- Service integrations without code + +### 6. Use Events to Trigger Transactions + +**Event-driven over synchronous request/response** + +```typescript +// Pattern: Event-driven processing +const bucket = new s3.Bucket(this, 'DataBucket'); + +bucket.addEventNotification( + s3.EventType.OBJECT_CREATED, + new s3n.LambdaDestination(processFunction), + { prefix: 'uploads/' } +); + +// Pattern: EventBridge integration +const rule = new events.Rule(this, 'OrderRule', { + eventPattern: { + source: ['orders'], + detailType: ['OrderPlaced'], + }, +}); + +rule.addTarget(new targets.LambdaFunction(processOrderFunction)); +``` + +**Benefits**: +- Loose coupling between services +- Asynchronous processing +- Better fault tolerance +- Independent scaling + +### 7. Design for Failures and Duplicates + +**Operations must be idempotent** + +```typescript +// ✅ GOOD - Idempotent operation +export const handler = async (event: SQSEvent) => { + for (const record of event.Records) { + const orderId = JSON.parse(record.body).orderId; + + // Check if already processed (idempotency) + const existing = await dynamodb.getItem({ + TableName: process.env.TABLE_NAME, + Key: { orderId }, + }); + + if (existing.Item) { + console.log('Order already processed:', orderId); + continue; // Skip duplicate + } + + // Process order + await processOrder(orderId); + + // Mark as processed + await dynamodb.putItem({ + TableName: process.env.TABLE_NAME, + Item: { orderId, processedAt: Date.now() }, + }); + } +}; +``` + +**Implement retry logic with exponential backoff**: +```typescript +async function withRetry(fn: () => Promise, maxRetries = 3): Promise { + for (let i = 0; i < maxRetries; i++) { + try { + return await fn(); + } catch (error) { + if (i === maxRetries - 1) throw error; + await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 1000)); + } + } + throw new Error('Max retries exceeded'); +} +``` + +## Event-Driven Architecture Patterns + +### Pattern 1: Event Router (EventBridge) + +Use EventBridge for event routing and filtering: + +```typescript +// Create custom event bus +const eventBus = new events.EventBus(this, 'AppEventBus', { + eventBusName: 'application-events', +}); + +// Define event schema +const schema = new events.Schema(this, 'OrderSchema', { + schemaName: 'OrderPlaced', + definition: events.SchemaDefinition.fromInline({ + openapi: '3.0.0', + info: { version: '1.0.0', title: 'Order Events' }, + paths: {}, + components: { + schemas: { + OrderPlaced: { + type: 'object', + properties: { + orderId: { type: 'string' }, + customerId: { type: 'string' }, + amount: { type: 'number' }, + }, + }, + }, + }, + }), +}); + +// Create rules for different consumers +new events.Rule(this, 'ProcessOrderRule', { + eventBus, + eventPattern: { + source: ['orders'], + detailType: ['OrderPlaced'], + }, + targets: [new targets.LambdaFunction(processOrderFunction)], +}); + +new events.Rule(this, 'NotifyCustomerRule', { + eventBus, + eventPattern: { + source: ['orders'], + detailType: ['OrderPlaced'], + }, + targets: [new targets.LambdaFunction(notifyCustomerFunction)], +}); +``` + +### Pattern 2: Queue-Based Processing (SQS) + +Use SQS for reliable asynchronous processing: + +```typescript +// Standard queue for at-least-once delivery +const queue = new sqs.Queue(this, 'ProcessingQueue', { + visibilityTimeout: Duration.seconds(300), + retentionPeriod: Duration.days(14), + deadLetterQueue: { + queue: dlq, + maxReceiveCount: 3, + }, +}); + +// FIFO queue for ordered processing +const fifoQueue = new sqs.Queue(this, 'OrderedQueue', { + fifo: true, + contentBasedDeduplication: true, + deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP, +}); + +// Lambda consumer +new lambda.EventSourceMapping(this, 'QueueConsumer', { + target: processingFunction, + eventSourceArn: queue.queueArn, + batchSize: 10, + maxBatchingWindow: Duration.seconds(5), +}); +``` + +### Pattern 3: Pub/Sub (SNS + SQS Fan-Out) + +Implement fan-out pattern for multiple consumers: + +```typescript +// Create SNS topic +const topic = new sns.Topic(this, 'OrderTopic', { + displayName: 'Order Events', +}); + +// Multiple SQS queues subscribe to topic +const inventoryQueue = new sqs.Queue(this, 'InventoryQueue'); +const shippingQueue = new sqs.Queue(this, 'ShippingQueue'); +const analyticsQueue = new sqs.Queue(this, 'AnalyticsQueue'); + +topic.addSubscription(new subscriptions.SqsSubscription(inventoryQueue)); +topic.addSubscription(new subscriptions.SqsSubscription(shippingQueue)); +topic.addSubscription(new subscriptions.SqsSubscription(analyticsQueue)); + +// Each queue has its own Lambda consumer +new lambda.EventSourceMapping(this, 'InventoryConsumer', { + target: inventoryFunction, + eventSourceArn: inventoryQueue.queueArn, +}); +``` + +### Pattern 4: Saga Pattern with Step Functions + +Implement distributed transactions: + +```typescript +const reserveFlight = new tasks.LambdaInvoke(this, 'ReserveFlight', { + lambdaFunction: reserveFlightFunction, + outputPath: '$.Payload', +}); + +const reserveHotel = new tasks.LambdaInvoke(this, 'ReserveHotel', { + lambdaFunction: reserveHotelFunction, + outputPath: '$.Payload', +}); + +const processPayment = new tasks.LambdaInvoke(this, 'ProcessPayment', { + lambdaFunction: processPaymentFunction, + outputPath: '$.Payload', +}); + +// Compensating transactions +const cancelFlight = new tasks.LambdaInvoke(this, 'CancelFlight', { + lambdaFunction: cancelFlightFunction, +}); + +const cancelHotel = new tasks.LambdaInvoke(this, 'CancelHotel', { + lambdaFunction: cancelHotelFunction, +}); + +// Define saga with compensation +const definition = reserveFlight + .next(reserveHotel) + .next(processPayment) + .addCatch(cancelHotel.next(cancelFlight), { + resultPath: '$.error', + }); + +new stepfunctions.StateMachine(this, 'BookingStateMachine', { + definition, + timeout: Duration.minutes(5), +}); +``` + +### Pattern 5: Event Sourcing + +Store events as source of truth: + +```typescript +// Event store with DynamoDB +const eventStore = new dynamodb.Table(this, 'EventStore', { + partitionKey: { name: 'aggregateId', type: dynamodb.AttributeType.STRING }, + sortKey: { name: 'version', type: dynamodb.AttributeType.NUMBER }, + stream: dynamodb.StreamViewType.NEW_IMAGE, +}); + +// Lambda function stores events +export const handleCommand = async (event: any) => { + const { aggregateId, eventType, eventData } = event; + + // Get current version + const items = await dynamodb.query({ + TableName: process.env.EVENT_STORE, + KeyConditionExpression: 'aggregateId = :id', + ExpressionAttributeValues: { ':id': aggregateId }, + ScanIndexForward: false, + Limit: 1, + }); + + const nextVersion = items.Items?.[0]?.version + 1 || 1; + + // Append new event + await dynamodb.putItem({ + TableName: process.env.EVENT_STORE, + Item: { + aggregateId, + version: nextVersion, + eventType, + eventData, + timestamp: Date.now(), + }, + }); +}; + +// Projections read from event stream +eventStore.grantStreamRead(projectionFunction); +``` + +## Serverless Architecture Patterns + +### Pattern 1: API-Driven Microservices + +REST APIs with Lambda backend: + +```typescript +const api = new apigateway.RestApi(this, 'Api', { + restApiName: 'microservices-api', + deployOptions: { + throttlingRateLimit: 1000, + throttlingBurstLimit: 2000, + tracingEnabled: true, + }, +}); + +// User service +const users = api.root.addResource('users'); +users.addMethod('GET', new apigateway.LambdaIntegration(getUsersFunction)); +users.addMethod('POST', new apigateway.LambdaIntegration(createUserFunction)); + +// Order service +const orders = api.root.addResource('orders'); +orders.addMethod('GET', new apigateway.LambdaIntegration(getOrdersFunction)); +orders.addMethod('POST', new apigateway.LambdaIntegration(createOrderFunction)); +``` + +### Pattern 2: Stream Processing + +Real-time data processing with Kinesis: + +```typescript +const stream = new kinesis.Stream(this, 'DataStream', { + shardCount: 2, + retentionPeriod: Duration.days(7), +}); + +// Lambda processes stream records +new lambda.EventSourceMapping(this, 'StreamProcessor', { + target: processFunction, + eventSourceArn: stream.streamArn, + batchSize: 100, + maxBatchingWindow: Duration.seconds(5), + parallelizationFactor: 10, + startingPosition: lambda.StartingPosition.LATEST, + retryAttempts: 3, + bisectBatchOnError: true, + onFailure: new lambdaDestinations.SqsDestination(dlq), +}); +``` + +### Pattern 3: Async Task Processing + +Background job processing: + +```typescript +// SQS queue for tasks +const taskQueue = new sqs.Queue(this, 'TaskQueue', { + visibilityTimeout: Duration.minutes(5), + receiveMessageWaitTime: Duration.seconds(20), // Long polling + deadLetterQueue: { + queue: dlq, + maxReceiveCount: 3, + }, +}); + +// Lambda worker processes tasks +const worker = new lambda.Function(this, 'TaskWorker', { + // ... configuration + reservedConcurrentExecutions: 10, // Control concurrency +}); + +new lambda.EventSourceMapping(this, 'TaskConsumer', { + target: worker, + eventSourceArn: taskQueue.queueArn, + batchSize: 10, + reportBatchItemFailures: true, // Partial batch failure handling +}); +``` + +### Pattern 4: Scheduled Jobs + +Periodic processing with EventBridge: + +```typescript +// Daily cleanup job +new events.Rule(this, 'DailyCleanup', { + schedule: events.Schedule.cron({ hour: '2', minute: '0' }), + targets: [new targets.LambdaFunction(cleanupFunction)], +}); + +// Process every 5 minutes +new events.Rule(this, 'FrequentProcessing', { + schedule: events.Schedule.rate(Duration.minutes(5)), + targets: [new targets.LambdaFunction(processFunction)], +}); +``` + +### Pattern 5: Webhook Processing + +Handle external webhooks: + +```typescript +// API Gateway endpoint for webhooks +const webhookApi = new apigateway.RestApi(this, 'WebhookApi', { + restApiName: 'webhooks', +}); + +const webhook = webhookApi.root.addResource('webhook'); +webhook.addMethod('POST', new apigateway.LambdaIntegration(webhookFunction, { + proxy: true, + timeout: Duration.seconds(29), // API Gateway max +})); + +// Lambda handler validates and queues webhook +export const handler = async (event: APIGatewayProxyEvent) => { + // Validate webhook signature + const isValid = validateSignature(event.headers, event.body); + if (!isValid) { + return { statusCode: 401, body: 'Invalid signature' }; + } + + // Queue for async processing + await sqs.sendMessage({ + QueueUrl: process.env.QUEUE_URL, + MessageBody: event.body, + }); + + // Return immediately + return { statusCode: 202, body: 'Accepted' }; +}; +``` + +## Best Practices + +### Error Handling + +**Implement comprehensive error handling**: + +```typescript +export const handler = async (event: SQSEvent) => { + const failures: SQSBatchItemFailure[] = []; + + for (const record of event.Records) { + try { + await processRecord(record); + } catch (error) { + console.error('Failed to process record:', record.messageId, error); + failures.push({ itemIdentifier: record.messageId }); + } + } + + // Return partial batch failures for retry + return { batchItemFailures: failures }; +}; +``` + +### Dead Letter Queues + +**Always configure DLQs for error handling**: + +```typescript +const dlq = new sqs.Queue(this, 'DLQ', { + retentionPeriod: Duration.days(14), +}); + +const queue = new sqs.Queue(this, 'Queue', { + deadLetterQueue: { + queue: dlq, + maxReceiveCount: 3, + }, +}); + +// Monitor DLQ depth +new cloudwatch.Alarm(this, 'DLQAlarm', { + metric: dlq.metricApproximateNumberOfMessagesVisible(), + threshold: 1, + evaluationPeriods: 1, + alarmDescription: 'Messages in DLQ require attention', +}); +``` + +### Observability + +**Enable tracing and monitoring**: + +```typescript +new NodejsFunction(this, 'Function', { + entry: 'src/handler.ts', + tracing: lambda.Tracing.ACTIVE, // X-Ray tracing + environment: { + POWERTOOLS_SERVICE_NAME: 'order-service', + POWERTOOLS_METRICS_NAMESPACE: 'MyApp', + LOG_LEVEL: 'INFO', + }, +}); +``` + +## Using MCP Servers Effectively + +### AWS Serverless MCP Usage + +**Lifecycle management**: +- Initialize new serverless projects +- Generate SAM templates +- Deploy applications +- Test locally before deployment + +### Lambda Tool MCP Usage + +**Function execution**: +- Test Lambda functions directly +- Execute automation workflows +- Access private resources +- Validate integrations + +### Step Functions MCP Usage + +**Workflow orchestration**: +- Create state machines for complex workflows +- Execute distributed transactions +- Implement saga patterns +- Coordinate microservices + +### SNS/SQS MCP Usage + +**Messaging operations**: +- Test pub/sub patterns +- Send test messages to queues +- Validate event routing +- Debug message processing + +## Additional Resources + +This skill includes comprehensive reference documentation based on AWS best practices: + +- **Serverless Patterns**: `references/serverless-patterns.md` + - Core serverless architectures and API patterns + - Data processing and integration patterns + - Orchestration with Step Functions + - Anti-patterns to avoid + +- **Event-Driven Architecture Patterns**: `references/eda-patterns.md` + - Event routing and processing patterns + - Event sourcing and saga patterns + - Idempotency and error handling + - Message ordering and deduplication + +- **Security Best Practices**: `references/security-best-practices.md` + - Shared responsibility model + - IAM least privilege patterns + - Data protection and encryption + - Network security with VPC + +- **Observability Best Practices**: `references/observability-best-practices.md` + - Three pillars: metrics, logs, traces + - Structured logging with Lambda Powertools + - X-Ray distributed tracing + - CloudWatch alarms and dashboards + +- **Performance Optimization**: `references/performance-optimization.md` + - Cold start optimization techniques + - Memory and CPU optimization + - Package size reduction + - Provisioned concurrency patterns + +- **Deployment Best Practices**: `references/deployment-best-practices.md` + - CI/CD pipeline design + - Testing strategies (unit, integration, load) + - Deployment strategies (canary, blue/green) + - Rollback and safety mechanisms + +**External Resources**: +- **AWS Well-Architected Serverless Lens**: https://docs.aws.amazon.com/wellarchitected/latest/serverless-applications-lens/ +- **ServerlessLand.com**: Pre-built serverless patterns +- **AWS Serverless Workshops**: https://serverlessland.com/learn?type=Workshops + +For detailed implementation patterns, anti-patterns, and code examples, refer to the comprehensive references in the skill directory. diff --git a/skills/aws-serverless-eda/references/deployment-best-practices.md b/skills/aws-serverless-eda/references/deployment-best-practices.md new file mode 100644 index 0000000..7779379 --- /dev/null +++ b/skills/aws-serverless-eda/references/deployment-best-practices.md @@ -0,0 +1,830 @@ +# Serverless Deployment Best Practices + +Deployment best practices for serverless applications including CI/CD, testing, and deployment strategies. + +## Table of Contents + +- [Software Release Process](#software-release-process) +- [Infrastructure as Code](#infrastructure-as-code) +- [CI/CD Pipeline Design](#cicd-pipeline-design) +- [Testing Strategies](#testing-strategies) +- [Deployment Strategies](#deployment-strategies) +- [Rollback and Safety](#rollback-and-safety) + +## Software Release Process + +### Four Stages of Release + +**1. Source Phase**: +- Developers commit code changes +- Code review (peer review) +- Version control (Git) + +**2. Build Phase**: +- Compile code +- Run unit tests +- Style checking and linting +- Create deployment packages +- Build container images + +**3. Test Phase**: +- Integration tests with other systems +- Load testing +- UI testing +- Security testing (penetration testing) +- Acceptance testing + +**4. Production Phase**: +- Deploy to production environment +- Monitor for errors +- Validate deployment success +- Rollback if needed + +### CI/CD Maturity Levels + +**Continuous Integration (CI)**: +- Automated build on code commit +- Automated unit testing +- Manual deployment to test/production + +**Continuous Delivery (CD)**: +- Automated deployment to test environments +- Manual approval for production +- Automated testing in non-prod + +**Continuous Deployment**: +- Fully automated pipeline +- Automated deployment to production +- No manual intervention after code commit + +## Infrastructure as Code + +### Framework Selection + +**AWS SAM (Serverless Application Model)**: + +```yaml +# template.yaml +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 + +Resources: + OrderFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.handler + Runtime: nodejs20.x + CodeUri: src/ + Events: + Api: + Type: Api + Properties: + Path: /orders + Method: post +``` + +**Benefits**: +- Simple, serverless-focused syntax +- Built-in best practices +- SAM CLI for local testing +- Integrates with CodeDeploy + +**AWS CDK**: + +```typescript +new NodejsFunction(this, 'OrderFunction', { + entry: 'src/orders/handler.ts', + environment: { + TABLE_NAME: ordersTable.tableName, + }, +}); + +ordersTable.grantReadWriteData(orderFunction); +``` + +**Benefits**: +- Type-safe, programmatic +- Reusable constructs +- Rich AWS service support +- Better for complex infrastructure + +**When to use**: +- **SAM**: Serverless-only applications, simpler projects +- **CDK**: Complex infrastructure, multiple services, reusable patterns + +### Environment Management + +**Separate environments**: + +```typescript +// CDK App +const app = new cdk.App(); + +new ServerlessStack(app, 'DevStack', { + env: { account: '111111111111', region: 'us-east-1' }, + environment: 'dev', + logLevel: 'DEBUG', +}); + +new ServerlessStack(app, 'ProdStack', { + env: { account: '222222222222', region: 'us-east-1' }, + environment: 'prod', + logLevel: 'INFO', +}); +``` + +**SAM with parameters**: + +```yaml +Parameters: + Environment: + Type: String + Default: dev + AllowedValues: + - dev + - staging + - prod + +Resources: + Function: + Type: AWS::Serverless::Function + Properties: + Environment: + Variables: + ENVIRONMENT: !Ref Environment + LOG_LEVEL: !If [IsProd, INFO, DEBUG] +``` + +## CI/CD Pipeline Design + +### AWS CodePipeline + +**Comprehensive pipeline**: + +```typescript +import * as codepipeline from 'aws-cdk-lib/aws-codepipeline'; +import * as codepipeline_actions from 'aws-cdk-lib/aws-codepipeline-actions'; + +const sourceOutput = new codepipeline.Artifact(); +const buildOutput = new codepipeline.Artifact(); + +const pipeline = new codepipeline.Pipeline(this, 'Pipeline', { + pipelineName: 'serverless-pipeline', +}); + +// Source stage +pipeline.addStage({ + stageName: 'Source', + actions: [ + new codepipeline_actions.CodeStarConnectionsSourceAction({ + actionName: 'GitHub_Source', + owner: 'myorg', + repo: 'myrepo', + branch: 'main', + output: sourceOutput, + connectionArn: githubConnection.connectionArn, + }), + ], +}); + +// Build stage +pipeline.addStage({ + stageName: 'Build', + actions: [ + new codepipeline_actions.CodeBuildAction({ + actionName: 'Build', + project: buildProject, + input: sourceOutput, + outputs: [buildOutput], + }), + ], +}); + +// Test stage +pipeline.addStage({ + stageName: 'Test', + actions: [ + new codepipeline_actions.CloudFormationCreateUpdateStackAction({ + actionName: 'Deploy_Test', + templatePath: buildOutput.atPath('packaged.yaml'), + stackName: 'test-stack', + adminPermissions: true, + }), + new codepipeline_actions.CodeBuildAction({ + actionName: 'Integration_Tests', + project: testProject, + input: buildOutput, + runOrder: 2, + }), + ], +}); + +// Production stage (with manual approval) +pipeline.addStage({ + stageName: 'Production', + actions: [ + new codepipeline_actions.ManualApprovalAction({ + actionName: 'Approve', + }), + new codepipeline_actions.CloudFormationCreateUpdateStackAction({ + actionName: 'Deploy_Prod', + templatePath: buildOutput.atPath('packaged.yaml'), + stackName: 'prod-stack', + adminPermissions: true, + runOrder: 2, + }), + ], +}); +``` + +### GitHub Actions + +**Serverless deployment workflow**: + +```yaml +# .github/workflows/deploy.yml +name: Deploy Serverless Application + +on: + push: + branches: [main] + +jobs: + build-and-deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Setup Node.js + uses: actions/setup-node@v3 + with: + node-version: '20' + + - name: Install dependencies + run: npm ci + + - name: Run tests + run: npm test + + - name: Setup SAM CLI + uses: aws-actions/setup-sam@v2 + + - name: Build SAM application + run: sam build + + - name: Deploy to Dev + if: github.ref != 'refs/heads/main' + run: | + sam deploy \ + --no-confirm-changeset \ + --no-fail-on-empty-changeset \ + --stack-name dev-stack \ + --parameter-overrides Environment=dev + + - name: Run integration tests + run: npm run test:integration + + - name: Deploy to Prod + if: github.ref == 'refs/heads/main' + run: | + sam deploy \ + --no-confirm-changeset \ + --no-fail-on-empty-changeset \ + --stack-name prod-stack \ + --parameter-overrides Environment=prod +``` + +## Testing Strategies + +### Unit Testing + +**Test business logic independently**: + +```typescript +// handler.ts +export const processOrder = (order: Order): ProcessedOrder => { + // Pure business logic (easily testable) + validateOrder(order); + calculateTotal(order); + return transformOrder(order); +}; + +export const handler = async (event: any) => { + const order = parseEvent(event); + const processed = processOrder(order); // Testable function + await saveToDatabase(processed); + return formatResponse(processed); +}; + +// handler.test.ts +import { processOrder } from './handler'; + +describe('processOrder', () => { + it('calculates total correctly', () => { + const order = { + items: [ + { price: 10, quantity: 2 }, + { price: 5, quantity: 3 }, + ], + }; + + const result = processOrder(order); + + expect(result.total).toBe(35); + }); + + it('throws on invalid order', () => { + const invalid = { items: [] }; + expect(() => processOrder(invalid)).toThrow(); + }); +}); +``` + +### Integration Testing + +**Test in actual AWS environment**: + +```typescript +// integration.test.ts +import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda'; +import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; + +describe('Order Processing Integration', () => { + const lambda = new LambdaClient({}); + const dynamodb = new DynamoDBClient({}); + + it('processes order end-to-end', async () => { + // Invoke Lambda + const response = await lambda.send(new InvokeCommand({ + FunctionName: process.env.FUNCTION_NAME, + Payload: JSON.stringify({ + orderId: 'test-123', + items: [{ productId: 'prod-1', quantity: 2 }], + }), + })); + + const result = JSON.parse(Buffer.from(response.Payload!).toString()); + + expect(result.statusCode).toBe(200); + + // Verify database write + const dbResult = await dynamodb.send(new GetItemCommand({ + TableName: process.env.TABLE_NAME, + Key: { orderId: { S: 'test-123' } }, + })); + + expect(dbResult.Item).toBeDefined(); + expect(dbResult.Item?.status.S).toBe('PROCESSED'); + }); +}); +``` + +### Local Testing with SAM + +**Test locally before deployment**: + +```bash +# Start local API +sam local start-api + +# Invoke function locally +sam local invoke OrderFunction -e events/create-order.json + +# Generate sample events +sam local generate-event apigateway aws-proxy > event.json + +# Debug locally +sam local invoke OrderFunction -d 5858 + +# Test with Docker +sam local start-api --docker-network my-network +``` + +### Load Testing + +**Test under production load**: + +```bash +# Install Artillery +npm install -g artillery + +# Create load test +cat > load-test.yml < { + // Fetch feature flags + const config = await appconfig.getLatestConfiguration({ + ConfigurationToken: token, + }); + + const features = JSON.parse(config.Configuration.toString()); + + if (features.newFeatureEnabled) { + return newFeatureHandler(event); + } + + return legacyHandler(event); +}; +``` + +## Summary + +- **IaC**: Use SAM or CDK for all deployments +- **Environments**: Separate dev, staging, production +- **CI/CD**: Automate build, test, and deployment +- **Testing**: Unit, integration, and load testing +- **Gradual Deployment**: Use canary or linear for production +- **Alarms**: Configure and monitor during deployment +- **Rollback**: Enable automatic rollback on failures +- **Hooks**: Validate before and after traffic shifts +- **Versioning**: Use Lambda versions and aliases +- **Multi-Region**: Plan for disaster recovery diff --git a/skills/aws-serverless-eda/references/eda-patterns.md b/skills/aws-serverless-eda/references/eda-patterns.md new file mode 100644 index 0000000..ba5c562 --- /dev/null +++ b/skills/aws-serverless-eda/references/eda-patterns.md @@ -0,0 +1,1002 @@ +# Event-Driven Architecture Patterns + +Comprehensive patterns for building event-driven systems on AWS with serverless technologies. + +## Table of Contents + +- [Core EDA Concepts](#core-eda-concepts) +- [Event Routing Patterns](#event-routing-patterns) +- [Event Processing Patterns](#event-processing-patterns) +- [Event Sourcing Patterns](#event-sourcing-patterns) +- [Saga Patterns](#saga-patterns) +- [Best Practices](#best-practices) + +## Core EDA Concepts + +### Event Types + +**Domain Events**: Represent business facts +```json +{ + "source": "orders", + "detailType": "OrderPlaced", + "detail": { + "orderId": "12345", + "customerId": "customer-1", + "amount": 100.00, + "timestamp": "2025-01-15T10:30:00Z" + } +} +``` + +**System Events**: Technical occurrences +```json +{ + "source": "aws.s3", + "detailType": "Object Created", + "detail": { + "bucket": "my-bucket", + "key": "data/file.json" + } +} +``` + +### Event Contracts + +Define clear contracts between producers and consumers: + +```typescript +// schemas/order-events.ts +export interface OrderPlacedEvent { + orderId: string; + customerId: string; + items: Array<{ + productId: string; + quantity: number; + price: number; + }>; + totalAmount: number; + timestamp: string; +} + +// Register schema with EventBridge +const registry = new events.EventBusSchemaRegistry(this, 'SchemaRegistry'); + +const schema = new events.Schema(this, 'OrderPlacedSchema', { + schemaName: 'OrderPlaced', + definition: events.SchemaDefinition.fromInline(/* JSON Schema */), +}); +``` + +## Event Routing Patterns + +### Pattern 1: Content-Based Routing + +Route events based on content: + +```typescript +// Route by order amount +new events.Rule(this, 'HighValueOrders', { + eventPattern: { + source: ['orders'], + detailType: ['OrderPlaced'], + detail: { + totalAmount: [{ numeric: ['>', 1000] }], + }, + }, + targets: [new targets.LambdaFunction(highValueOrderFunction)], +}); + +new events.Rule(this, 'StandardOrders', { + eventPattern: { + source: ['orders'], + detailType: ['OrderPlaced'], + detail: { + totalAmount: [{ numeric: ['<=', 1000] }], + }, + }, + targets: [new targets.LambdaFunction(standardOrderFunction)], +}); +``` + +### Pattern 2: Event Filtering + +Filter events before processing: + +```typescript +// Filter by multiple criteria +new events.Rule(this, 'FilteredRule', { + eventPattern: { + source: ['inventory'], + detailType: ['StockUpdate'], + detail: { + warehouseId: ['WH-1', 'WH-2'], // Specific warehouses + quantity: [{ numeric: ['<', 10] }], // Low stock only + productCategory: ['electronics'], // Specific category + }, + }, + targets: [new targets.LambdaFunction(reorderFunction)], +}); +``` + +### Pattern 3: Event Replay and Archive + +Store events for replay and audit: + +```typescript +// Archive all events +const archive = new events.Archive(this, 'EventArchive', { + eventPattern: { + account: [this.account], + }, + retention: Duration.days(365), +}); + +// Replay events when needed +// Use AWS Console or CLI to replay from archive +``` + +### Pattern 4: Cross-Account Event Routing + +Route events to other AWS accounts: + +```typescript +// Event bus in Account A +const eventBus = new events.EventBus(this, 'SharedBus'); + +// Grant permission to Account B +eventBus.addToResourcePolicy(new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + principals: [new iam.AccountPrincipal('ACCOUNT-B-ID')], + actions: ['events:PutEvents'], + resources: [eventBus.eventBusArn], +})); + +// Rule forwards to Account B event bus +new events.Rule(this, 'ForwardToAccountB', { + eventBus, + eventPattern: { + source: ['shared-service'], + }, + targets: [new targets.EventBus( + events.EventBus.fromEventBusArn( + this, + 'AccountBBus', + 'arn:aws:events:us-east-1:ACCOUNT-B-ID:event-bus/default' + ) + )], +}); +``` + +## Event Processing Patterns + +### Pattern 1: Event Transformation + +Transform events before routing: + +```typescript +// EventBridge input transformer +new events.Rule(this, 'TransformRule', { + eventPattern: { + source: ['orders'], + }, + targets: [new targets.LambdaFunction(processFunction, { + event: events.RuleTargetInput.fromObject({ + orderId: events.EventField.fromPath('$.detail.orderId'), + customerEmail: events.EventField.fromPath('$.detail.customer.email'), + amount: events.EventField.fromPath('$.detail.totalAmount'), + // Transformed structure + }), + })], +}); +``` + +### Pattern 2: Event Aggregation + +Aggregate multiple events: + +```typescript +// DynamoDB stores partial results +export const handler = async (event: any) => { + const { transactionId, step, data } = event; + + // Store step result + await dynamodb.updateItem({ + TableName: process.env.TABLE_NAME, + Key: { transactionId }, + UpdateExpression: 'SET #step = :data', + ExpressionAttributeNames: { '#step': step }, + ExpressionAttributeValues: { ':data': data }, + }); + + // Check if all steps complete + const item = await dynamodb.getItem({ + TableName: process.env.TABLE_NAME, + Key: { transactionId }, + }); + + if (allStepsComplete(item)) { + // Trigger final processing + await eventBridge.putEvents({ + Entries: [{ + Source: 'aggregator', + DetailType: 'AllStepsComplete', + Detail: JSON.stringify(item), + }], + }); + } +}; +``` + +### Pattern 3: Event Enrichment + +Enrich events with additional data: + +```typescript +export const enrichEvent = async (event: any) => { + const { customerId } = event.detail; + + // Fetch additional customer data + const customer = await dynamodb.getItem({ + TableName: process.env.CUSTOMER_TABLE, + Key: { customerId }, + }); + + // Publish enriched event + await eventBridge.putEvents({ + Entries: [{ + Source: 'orders', + DetailType: 'OrderEnriched', + Detail: JSON.stringify({ + ...event.detail, + customerName: customer.Item?.name, + customerTier: customer.Item?.tier, + customerEmail: customer.Item?.email, + }), + }], + }); +}; +``` + +### Pattern 4: Event Fork and Join + +Process event multiple ways then aggregate: + +```typescript +// Step Functions parallel + aggregation +const parallel = new stepfunctions.Parallel(this, 'ForkProcessing'); + +parallel.branch(new tasks.LambdaInvoke(this, 'ValidateInventory', { + lambdaFunction: inventoryFunction, + resultPath: '$.inventory', +})); + +parallel.branch(new tasks.LambdaInvoke(this, 'CheckCredit', { + lambdaFunction: creditFunction, + resultPath: '$.credit', +})); + +parallel.branch(new tasks.LambdaInvoke(this, 'CalculateShipping', { + lambdaFunction: shippingFunction, + resultPath: '$.shipping', +})); + +const definition = parallel.next( + new tasks.LambdaInvoke(this, 'AggregateResults', { + lambdaFunction: aggregateFunction, + }) +); +``` + +## Event Sourcing Patterns + +### Pattern: Event Store with DynamoDB + +Store all events as source of truth: + +```typescript +const eventStore = new dynamodb.Table(this, 'EventStore', { + partitionKey: { name: 'aggregateId', type: dynamodb.AttributeType.STRING }, + sortKey: { name: 'version', type: dynamodb.AttributeType.NUMBER }, + stream: dynamodb.StreamViewType.NEW_IMAGE, + pointInTimeRecovery: true, // Important for audit +}); + +// Append events +export const appendEvent = async (aggregateId: string, event: any) => { + const version = await getNextVersion(aggregateId); + + await dynamodb.putItem({ + TableName: process.env.EVENT_STORE, + Item: { + aggregateId, + version, + eventType: event.type, + eventData: event.data, + timestamp: Date.now(), + userId: event.userId, + }, + ConditionExpression: 'attribute_not_exists(version)', // Optimistic locking + }); +}; + +// Rebuild state from events +export const rebuildState = async (aggregateId: string) => { + const events = await dynamodb.query({ + TableName: process.env.EVENT_STORE, + KeyConditionExpression: 'aggregateId = :id', + ExpressionAttributeValues: { ':id': aggregateId }, + ScanIndexForward: true, // Chronological order + }); + + let state = initialState(); + for (const event of events.Items) { + state = applyEvent(state, event); + } + + return state; +}; +``` + +### Pattern: Materialized Views + +Create read-optimized projections: + +```typescript +// Event store stream triggers projection +eventStore.grantStreamRead(projectionFunction); + +new lambda.EventSourceMapping(this, 'Projection', { + target: projectionFunction, + eventSourceArn: eventStore.tableStreamArn, + startingPosition: lambda.StartingPosition.LATEST, +}); + +// Projection function updates read model +export const updateProjection = async (event: DynamoDBStreamEvent) => { + for (const record of event.Records) { + if (record.eventName !== 'INSERT') continue; + + const eventData = record.dynamodb?.NewImage; + const aggregateId = eventData?.aggregateId.S; + + // Rebuild current state + const currentState = await rebuildState(aggregateId); + + // Update read model + await readModelTable.putItem({ + TableName: process.env.READ_MODEL_TABLE, + Item: currentState, + }); + } +}; +``` + +### Pattern: Snapshots + +Optimize event replay with snapshots: + +```typescript +export const createSnapshot = async (aggregateId: string) => { + // Rebuild state from all events + const state = await rebuildState(aggregateId); + const version = await getLatestVersion(aggregateId); + + // Store snapshot + await snapshotTable.putItem({ + TableName: process.env.SNAPSHOT_TABLE, + Item: { + aggregateId, + version, + state: JSON.stringify(state), + createdAt: Date.now(), + }, + }); +}; + +// Rebuild from snapshot + newer events +export const rebuildFromSnapshot = async (aggregateId: string) => { + // Get latest snapshot + const snapshot = await getLatestSnapshot(aggregateId); + + let state = JSON.parse(snapshot.state); + const snapshotVersion = snapshot.version; + + // Apply only events after snapshot + const events = await getEventsSinceVersion(aggregateId, snapshotVersion); + + for (const event of events) { + state = applyEvent(state, event); + } + + return state; +}; +``` + +## Saga Patterns + +### Pattern: Choreography-Based Saga + +Services coordinate through events: + +```typescript +// Order Service publishes event +export const placeOrder = async (order: Order) => { + await saveOrder(order); + + await eventBridge.putEvents({ + Entries: [{ + Source: 'orders', + DetailType: 'OrderPlaced', + Detail: JSON.stringify({ orderId: order.id }), + }], + }); +}; + +// Inventory Service reacts to event +new events.Rule(this, 'ReserveInventory', { + eventPattern: { + source: ['orders'], + detailType: ['OrderPlaced'], + }, + targets: [new targets.LambdaFunction(reserveInventoryFunction)], +}); + +// Inventory Service publishes result +export const reserveInventory = async (event: any) => { + const { orderId } = event.detail; + + try { + await reserve(orderId); + + await eventBridge.putEvents({ + Entries: [{ + Source: 'inventory', + DetailType: 'InventoryReserved', + Detail: JSON.stringify({ orderId }), + }], + }); + } catch (error) { + await eventBridge.putEvents({ + Entries: [{ + Source: 'inventory', + DetailType: 'InventoryReservationFailed', + Detail: JSON.stringify({ orderId, error: error.message }), + }], + }); + } +}; + +// Payment Service reacts to inventory event +new events.Rule(this, 'ProcessPayment', { + eventPattern: { + source: ['inventory'], + detailType: ['InventoryReserved'], + }, + targets: [new targets.LambdaFunction(processPaymentFunction)], +}); +``` + +### Pattern: Orchestration-Based Saga + +Central coordinator manages saga: + +```typescript +// Step Functions orchestrates saga +const definition = new tasks.LambdaInvoke(this, 'ReserveInventory', { + lambdaFunction: reserveInventoryFunction, + resultPath: '$.inventory', +}) + .next(new tasks.LambdaInvoke(this, 'ProcessPayment', { + lambdaFunction: processPaymentFunction, + resultPath: '$.payment', + })) + .next(new tasks.LambdaInvoke(this, 'ShipOrder', { + lambdaFunction: shipOrderFunction, + resultPath: '$.shipment', + })) + .addCatch( + // Compensation flow + new tasks.LambdaInvoke(this, 'RefundPayment', { + lambdaFunction: refundFunction, + }) + .next(new tasks.LambdaInvoke(this, 'ReleaseInventory', { + lambdaFunction: releaseFunction, + })), + { + errors: ['States.TaskFailed'], + resultPath: '$.error', + } + ); + +new stepfunctions.StateMachine(this, 'OrderSaga', { + definition, + tracingEnabled: true, +}); +``` + +**Comparison**: + +| Aspect | Choreography | Orchestration | +|--------|--------------|---------------| +| Coordination | Decentralized | Centralized | +| Coupling | Loose | Tighter | +| Visibility | Distributed logs | Single execution history | +| Debugging | Harder (trace across services) | Easier (single workflow) | +| Best for | Simple flows | Complex flows | + +## Best Practices + +### Idempotency + +**Always make event handlers idempotent**: + +```typescript +// Use idempotency keys +export const handler = async (event: any) => { + const idempotencyKey = event.requestId || event.messageId; + + // Check if already processed + try { + const existing = await dynamodb.getItem({ + TableName: process.env.IDEMPOTENCY_TABLE, + Key: { idempotencyKey }, + }); + + if (existing.Item) { + console.log('Already processed:', idempotencyKey); + return existing.Item.result; // Return cached result + } + } catch (error) { + // First time processing + } + + // Process event + const result = await processEvent(event); + + // Store result + await dynamodb.putItem({ + TableName: process.env.IDEMPOTENCY_TABLE, + Item: { + idempotencyKey, + result, + processedAt: Date.now(), + }, + // Optional: Set TTL for cleanup + ExpirationTime: Math.floor(Date.now() / 1000) + 86400, // 24 hours + }); + + return result; +}; +``` + +### Event Versioning + +**Handle event schema evolution**: + +```typescript +// Version events +interface OrderPlacedEventV1 { + version: '1.0'; + orderId: string; + amount: number; +} + +interface OrderPlacedEventV2 { + version: '2.0'; + orderId: string; + amount: number; + currency: string; // New field +} + +// Handler supports multiple versions +export const handler = async (event: any) => { + const eventVersion = event.detail.version || '1.0'; + + switch (eventVersion) { + case '1.0': + return processV1(event.detail as OrderPlacedEventV1); + case '2.0': + return processV2(event.detail as OrderPlacedEventV2); + default: + throw new Error(`Unsupported event version: ${eventVersion}`); + } +}; + +const processV1 = async (event: OrderPlacedEventV1) => { + // Upgrade to V2 internally + const v2Event: OrderPlacedEventV2 = { + ...event, + version: '2.0', + currency: 'USD', // Default value + }; + return processV2(v2Event); +}; +``` + +### Eventual Consistency + +**Design for eventual consistency**: + +```typescript +// Service A writes to its database +export const createOrder = async (order: Order) => { + // Write to Order database + await orderTable.putItem({ Item: order }); + + // Publish event + await eventBridge.putEvents({ + Entries: [{ + Source: 'orders', + DetailType: 'OrderCreated', + Detail: JSON.stringify({ orderId: order.id }), + }], + }); +}; + +// Service B eventually updates its database +export const onOrderCreated = async (event: any) => { + const { orderId } = event.detail; + + // Fetch additional data + const orderDetails = await getOrderDetails(orderId); + + // Update inventory database (eventual consistency) + await inventoryTable.updateItem({ + Key: { productId: orderDetails.productId }, + UpdateExpression: 'SET reserved = reserved + :qty', + ExpressionAttributeValues: { ':qty': orderDetails.quantity }, + }); +}; +``` + +### Error Handling in EDA + +**Comprehensive error handling strategy**: + +```typescript +// Dead Letter Queue for failed events +const dlq = new sqs.Queue(this, 'EventDLQ', { + retentionPeriod: Duration.days(14), +}); + +// EventBridge rule with DLQ +new events.Rule(this, 'ProcessRule', { + eventPattern: { /* ... */ }, + targets: [ + new targets.LambdaFunction(processFunction, { + deadLetterQueue: dlq, + maxEventAge: Duration.hours(2), + retryAttempts: 2, + }), + ], +}); + +// Monitor DLQ +new cloudwatch.Alarm(this, 'DLQAlarm', { + metric: dlq.metricApproximateNumberOfMessagesVisible(), + threshold: 1, + evaluationPeriods: 1, +}); + +// DLQ processor for manual review +new lambda.EventSourceMapping(this, 'DLQProcessor', { + target: dlqProcessorFunction, + eventSourceArn: dlq.queueArn, + enabled: false, // Enable manually when reviewing +}); +``` + +### Message Ordering + +**When order matters**: + +```typescript +// SQS FIFO for strict ordering +const fifoQueue = new sqs.Queue(this, 'OrderedQueue', { + fifo: true, + contentBasedDeduplication: true, + deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP, + fifoThroughputLimit: sqs.FifoThroughputLimit.PER_MESSAGE_GROUP_ID, +}); + +// Publish with message group ID +await sqs.sendMessage({ + QueueUrl: process.env.QUEUE_URL, + MessageBody: JSON.stringify(event), + MessageGroupId: customerId, // All messages for same customer in order + MessageDeduplicationId: eventId, // Prevent duplicates +}); + +// Kinesis for ordered streams +const stream = new kinesis.Stream(this, 'Stream', { + shardCount: 1, // Single shard = strict ordering +}); + +// Partition key ensures same partition +await kinesis.putRecord({ + StreamName: process.env.STREAM_NAME, + Data: Buffer.from(JSON.stringify(event)), + PartitionKey: customerId, // Same key = same shard +}); +``` + +### Deduplication + +**Prevent duplicate event processing**: + +```typescript +// Content-based deduplication with SQS FIFO +const queue = new sqs.Queue(this, 'Queue', { + fifo: true, + contentBasedDeduplication: true, // Hash of message body +}); + +// Manual deduplication with DynamoDB +export const handler = async (event: any) => { + const eventId = event.id || event.messageId; + + try { + // Conditional write (fails if exists) + await dynamodb.putItem({ + TableName: process.env.DEDUP_TABLE, + Item: { + eventId, + processedAt: Date.now(), + ttl: Math.floor(Date.now() / 1000) + 86400, // 24h TTL + }, + ConditionExpression: 'attribute_not_exists(eventId)', + }); + + // Event is unique, process it + await processEvent(event); + } catch (error) { + if (error.code === 'ConditionalCheckFailedException') { + console.log('Duplicate event ignored:', eventId); + return; // Already processed + } + throw error; // Other error + } +}; +``` + +### Backpressure Handling + +**Prevent overwhelming downstream systems**: + +```typescript +// Control Lambda concurrency +const consumerFunction = new lambda.Function(this, 'Consumer', { + reservedConcurrentExecutions: 10, // Max 10 concurrent +}); + +// SQS visibility timeout + retry logic +const queue = new sqs.Queue(this, 'Queue', { + visibilityTimeout: Duration.seconds(300), // 5 minutes + receiveMessageWaitTime: Duration.seconds(20), // Long polling +}); + +new lambda.EventSourceMapping(this, 'Consumer', { + target: consumerFunction, + eventSourceArn: queue.queueArn, + batchSize: 10, + maxConcurrency: 5, // Process 5 batches concurrently + reportBatchItemFailures: true, +}); + +// Circuit breaker pattern +let consecutiveFailures = 0; +const FAILURE_THRESHOLD = 5; + +export const handler = async (event: any) => { + // Check circuit breaker + if (consecutiveFailures >= FAILURE_THRESHOLD) { + console.error('Circuit breaker open, skipping processing'); + throw new Error('Circuit breaker open'); + } + + try { + await processEvent(event); + consecutiveFailures = 0; // Reset on success + } catch (error) { + consecutiveFailures++; + throw error; + } +}; +``` + +## Advanced Patterns + +### Pattern: Event Replay + +Replay events for recovery or testing: + +```typescript +// Archive events for replay +const archive = new events.Archive(this, 'Archive', { + sourceEventBus: eventBus, + eventPattern: { + account: [this.account], + }, + retention: Duration.days(365), +}); + +// Replay programmatically +export const replayEvents = async (startTime: Date, endTime: Date) => { + // Use AWS SDK to start replay + await eventBridge.startReplay({ + ReplayName: `replay-${Date.now()}`, + EventSourceArn: archive.archiveArn, + EventStartTime: startTime, + EventEndTime: endTime, + Destination: { + Arn: eventBus.eventBusArn, + }, + }); +}; +``` + +### Pattern: Event Time vs Processing Time + +Handle late-arriving events: + +```typescript +// Include event timestamp +interface Event { + eventId: string; + eventTime: string; // When event occurred + processingTime?: string; // When event processed + data: any; +} + +// Windowed aggregation +export const aggregateWindow = async (events: Event[]) => { + // Group by event time window (not processing time) + const windows = new Map(); + + for (const event of events) { + const window = getWindowForTime(new Date(event.eventTime), Duration.minutes(5)); + const key = window.toISOString(); + + if (!windows.has(key)) { + windows.set(key, []); + } + windows.get(key)!.push(event); + } + + // Process each window + for (const [window, eventsInWindow] of windows) { + await processWindow(window, eventsInWindow); + } +}; +``` + +### Pattern: Transactional Outbox + +Ensure event publishing with database writes: + +```typescript +// Single DynamoDB transaction +export const createOrderWithEvent = async (order: Order) => { + await dynamodb.transactWriteItems({ + TransactItems: [ + { + // Write order + Put: { + TableName: process.env.ORDERS_TABLE, + Item: marshall(order), + }, + }, + { + // Write outbox event + Put: { + TableName: process.env.OUTBOX_TABLE, + Item: marshall({ + eventId: uuid(), + eventType: 'OrderPlaced', + eventData: order, + status: 'PENDING', + createdAt: Date.now(), + }), + }, + }, + ], + }); +}; + +// Separate Lambda processes outbox +new lambda.EventSourceMapping(this, 'OutboxProcessor', { + target: outboxFunction, + eventSourceArn: outboxTable.tableStreamArn, + startingPosition: lambda.StartingPosition.LATEST, +}); + +export const processOutbox = async (event: DynamoDBStreamEvent) => { + for (const record of event.Records) { + if (record.eventName !== 'INSERT') continue; + + const outboxEvent = unmarshall(record.dynamodb?.NewImage); + + // Publish to EventBridge + await eventBridge.putEvents({ + Entries: [{ + Source: 'orders', + DetailType: outboxEvent.eventType, + Detail: JSON.stringify(outboxEvent.eventData), + }], + }); + + // Mark as processed + await dynamodb.updateItem({ + TableName: process.env.OUTBOX_TABLE, + Key: { eventId: outboxEvent.eventId }, + UpdateExpression: 'SET #status = :status', + ExpressionAttributeNames: { '#status': 'status' }, + ExpressionAttributeValues: { ':status': 'PUBLISHED' }, + }); + } +}; +``` + +## Testing Event-Driven Systems + +### Pattern: Event Replay for Testing + +```typescript +// Publish test events +export const publishTestEvents = async () => { + const testEvents = [ + { source: 'orders', detailType: 'OrderPlaced', detail: { orderId: '1' } }, + { source: 'orders', detailType: 'OrderPlaced', detail: { orderId: '2' } }, + ]; + + for (const event of testEvents) { + await eventBridge.putEvents({ Entries: [event] }); + } +}; + +// Monitor processing +export const verifyProcessing = async () => { + // Check downstream databases + const order1 = await orderTable.getItem({ Key: { orderId: '1' } }); + const order2 = await orderTable.getItem({ Key: { orderId: '2' } }); + + expect(order1.Item).toBeDefined(); + expect(order2.Item).toBeDefined(); +}; +``` + +### Pattern: Event Mocking + +```typescript +// Mock EventBridge in tests +const mockEventBridge = { + putEvents: jest.fn().mockResolvedValue({}), +}; + +// Test event publishing +test('publishes event on order creation', async () => { + await createOrder(mockEventBridge, order); + + expect(mockEventBridge.putEvents).toHaveBeenCalledWith({ + Entries: [ + expect.objectContaining({ + Source: 'orders', + DetailType: 'OrderPlaced', + }), + ], + }); +}); +``` + +## Summary + +- **Loose Coupling**: Services communicate via events, not direct calls +- **Async Processing**: Use queues and event buses for asynchronous workflows +- **Idempotency**: Always handle duplicate events gracefully +- **Dead Letter Queues**: Configure DLQs for error handling +- **Event Contracts**: Define clear schemas for events +- **Observability**: Enable tracing and monitoring across services +- **Eventual Consistency**: Design for it, don't fight it +- **Saga Patterns**: Use for distributed transactions +- **Event Sourcing**: Store events as source of truth when needed diff --git a/skills/aws-serverless-eda/references/observability-best-practices.md b/skills/aws-serverless-eda/references/observability-best-practices.md new file mode 100644 index 0000000..b6539db --- /dev/null +++ b/skills/aws-serverless-eda/references/observability-best-practices.md @@ -0,0 +1,770 @@ +# Serverless Observability Best Practices + +Comprehensive observability patterns for serverless applications based on AWS best practices. + +## Table of Contents + +- [Three Pillars of Observability](#three-pillars-of-observability) +- [Metrics](#metrics) +- [Logging](#logging) +- [Tracing](#tracing) +- [Unified Observability](#unified-observability) +- [Alerting](#alerting) + +## Three Pillars of Observability + +### Metrics +**Numeric data measured at intervals (time series)** +- Request rate, error rate, duration +- CPU%, memory%, disk% +- Custom business metrics +- Service Level Indicators (SLIs) + +### Logs +**Timestamped records of discrete events** +- Application events and errors +- State transformations +- Debugging information +- Audit trails + +### Traces +**Single user's journey across services** +- Request flow through distributed system +- Service dependencies +- Latency breakdown +- Error propagation + +## Metrics + +### CloudWatch Metrics for Lambda + +**Out-of-the-box metrics** (automatically available): +``` +- Invocations +- Errors +- Throttles +- Duration +- ConcurrentExecutions +- IteratorAge (for streams) +``` + +**CDK Configuration**: +```typescript +const fn = new NodejsFunction(this, 'Function', { + entry: 'src/handler.ts', +}); + +// Create alarms on metrics +new cloudwatch.Alarm(this, 'ErrorAlarm', { + metric: fn.metricErrors({ + statistic: 'Sum', + period: Duration.minutes(5), + }), + threshold: 10, + evaluationPeriods: 1, +}); + +new cloudwatch.Alarm(this, 'DurationAlarm', { + metric: fn.metricDuration({ + statistic: 'p99', + period: Duration.minutes(5), + }), + threshold: 1000, // 1 second + evaluationPeriods: 2, +}); +``` + +### Custom Metrics + +**Use CloudWatch Embedded Metric Format (EMF)**: + +```typescript +export const handler = async (event: any) => { + const startTime = Date.now(); + + try { + const result = await processOrder(event); + + // Emit custom metrics + console.log(JSON.stringify({ + _aws: { + Timestamp: Date.now(), + CloudWatchMetrics: [{ + Namespace: 'MyApp/Orders', + Dimensions: [['ServiceName', 'Operation']], + Metrics: [ + { Name: 'ProcessingTime', Unit: 'Milliseconds' }, + { Name: 'OrderValue', Unit: 'None' }, + ], + }], + }, + ServiceName: 'OrderService', + Operation: 'ProcessOrder', + ProcessingTime: Date.now() - startTime, + OrderValue: result.amount, + })); + + return result; + } catch (error) { + // Emit error metric + console.log(JSON.stringify({ + _aws: { + CloudWatchMetrics: [{ + Namespace: 'MyApp/Orders', + Dimensions: [['ServiceName']], + Metrics: [{ Name: 'Errors', Unit: 'Count' }], + }], + }, + ServiceName: 'OrderService', + Errors: 1, + })); + + throw error; + } +}; +``` + +**Using Lambda Powertools**: + +```typescript +import { Metrics, MetricUnits } from '@aws-lambda-powertools/metrics'; + +const metrics = new Metrics({ + namespace: 'MyApp', + serviceName: 'OrderService', +}); + +export const handler = async (event: any) => { + metrics.addMetric('Invocation', MetricUnits.Count, 1); + + const startTime = Date.now(); + + try { + const result = await processOrder(event); + + metrics.addMetric('Success', MetricUnits.Count, 1); + metrics.addMetric('ProcessingTime', MetricUnits.Milliseconds, Date.now() - startTime); + metrics.addMetric('OrderValue', MetricUnits.None, result.amount); + + return result; + } catch (error) { + metrics.addMetric('Error', MetricUnits.Count, 1); + throw error; + } finally { + metrics.publishStoredMetrics(); + } +}; +``` + +## Logging + +### Structured Logging + +**Use JSON format for logs**: + +```typescript +// ✅ GOOD - Structured JSON logging +export const handler = async (event: any) => { + console.log(JSON.stringify({ + level: 'INFO', + message: 'Processing order', + orderId: event.orderId, + customerId: event.customerId, + timestamp: new Date().toISOString(), + requestId: context.requestId, + })); + + try { + const result = await processOrder(event); + + console.log(JSON.stringify({ + level: 'INFO', + message: 'Order processed successfully', + orderId: event.orderId, + duration: Date.now() - startTime, + timestamp: new Date().toISOString(), + })); + + return result; + } catch (error) { + console.error(JSON.stringify({ + level: 'ERROR', + message: 'Order processing failed', + orderId: event.orderId, + error: { + name: error.name, + message: error.message, + stack: error.stack, + }, + timestamp: new Date().toISOString(), + })); + + throw error; + } +}; + +// ❌ BAD - Unstructured logging +console.log('Processing order ' + orderId + ' for customer ' + customerId); +``` + +**Using Lambda Powertools Logger**: + +```typescript +import { Logger } from '@aws-lambda-powertools/logger'; + +const logger = new Logger({ + serviceName: 'OrderService', + logLevel: 'INFO', +}); + +export const handler = async (event: any, context: Context) => { + logger.addContext(context); + + logger.info('Processing order', { + orderId: event.orderId, + customerId: event.customerId, + }); + + try { + const result = await processOrder(event); + + logger.info('Order processed', { + orderId: event.orderId, + amount: result.amount, + }); + + return result; + } catch (error) { + logger.error('Order processing failed', { + orderId: event.orderId, + error, + }); + + throw error; + } +}; +``` + +### Log Levels + +**Use appropriate log levels**: +- **ERROR**: Errors requiring immediate attention +- **WARN**: Warnings or recoverable errors +- **INFO**: Important business events +- **DEBUG**: Detailed debugging information (disable in production) + +```typescript +const logger = new Logger({ + serviceName: 'OrderService', + logLevel: process.env.LOG_LEVEL || 'INFO', +}); + +logger.debug('Detailed processing info', { data }); +logger.info('Business event occurred', { event }); +logger.warn('Recoverable error', { error }); +logger.error('Critical failure', { error }); +``` + +### Log Insights Queries + +**Common CloudWatch Logs Insights queries**: + +``` +# Find errors in last hour +fields @timestamp, @message, level, error.message +| filter level = "ERROR" +| sort @timestamp desc +| limit 100 + +# Count errors by type +stats count() by error.name as ErrorType +| sort count desc + +# Calculate p99 latency +stats percentile(duration, 99) by serviceName + +# Find slow requests +fields @timestamp, orderId, duration +| filter duration > 1000 +| sort duration desc +| limit 50 + +# Track specific customer requests +fields @timestamp, @message, orderId +| filter customerId = "customer-123" +| sort @timestamp desc +``` + +## Tracing + +### Enable X-Ray Tracing + +**Configure X-Ray for Lambda**: + +```typescript +const fn = new NodejsFunction(this, 'Function', { + entry: 'src/handler.ts', + tracing: lambda.Tracing.ACTIVE, // Enable X-Ray +}); + +// API Gateway tracing +const api = new apigateway.RestApi(this, 'Api', { + deployOptions: { + tracingEnabled: true, + }, +}); + +// Step Functions tracing +new stepfunctions.StateMachine(this, 'StateMachine', { + definition, + tracingEnabled: true, +}); +``` + +**Instrument application code**: + +```typescript +import { captureAWSv3Client } from 'aws-xray-sdk-core'; +import { DynamoDBClient } from '@aws-sdk/client-dynamodb'; + +// Wrap AWS SDK clients +const client = captureAWSv3Client(new DynamoDBClient({})); + +// Custom segments +import AWSXRay from 'aws-xray-sdk-core'; + +export const handler = async (event: any) => { + const segment = AWSXRay.getSegment(); + + // Custom subsegment + const subsegment = segment.addNewSubsegment('ProcessOrder'); + + try { + // Add annotations (indexed for filtering) + subsegment.addAnnotation('orderId', event.orderId); + subsegment.addAnnotation('customerId', event.customerId); + + // Add metadata (not indexed, detailed info) + subsegment.addMetadata('orderDetails', event); + + const result = await processOrder(event); + + subsegment.addAnnotation('status', 'success'); + subsegment.close(); + + return result; + } catch (error) { + subsegment.addError(error); + subsegment.close(); + throw error; + } +}; +``` + +**Using Lambda Powertools Tracer**: + +```typescript +import { Tracer } from '@aws-lambda-powertools/tracer'; + +const tracer = new Tracer({ serviceName: 'OrderService' }); + +export const handler = async (event: any) => { + const segment = tracer.getSegment(); + + // Automatically captures and traces + const result = await tracer.captureAWSv3Client(dynamodb).getItem({ + TableName: process.env.TABLE_NAME, + Key: { orderId: event.orderId }, + }); + + // Custom annotation + tracer.putAnnotation('orderId', event.orderId); + tracer.putMetadata('orderDetails', event); + + return result; +}; +``` + +### Service Map + +**Visualize service dependencies** with X-Ray: +- Shows service-to-service communication +- Identifies latency bottlenecks +- Highlights error rates between services +- Tracks downstream dependencies + +### Distributed Tracing Best Practices + +1. **Enable tracing everywhere**: Lambda, API Gateway, Step Functions +2. **Use annotations for filtering**: Indexed fields for queries +3. **Use metadata for details**: Non-indexed detailed information +4. **Sample appropriately**: 100% for low traffic, sampled for high traffic +5. **Correlate with logs**: Include trace ID in log entries + +## Unified Observability + +### Correlation Between Pillars + +**Include trace ID in logs**: + +```typescript +export const handler = async (event: any, context: Context) => { + const traceId = process.env._X_AMZN_TRACE_ID; + + console.log(JSON.stringify({ + level: 'INFO', + message: 'Processing order', + traceId, + requestId: context.requestId, + orderId: event.orderId, + })); +}; +``` + +### CloudWatch ServiceLens + +**Unified view of traces and metrics**: +- Automatically correlates X-Ray traces with CloudWatch metrics +- Shows service map with metrics overlay +- Identifies performance and availability issues +- Provides end-to-end request view + +### Lambda Powertools Integration + +**All three pillars in one**: + +```typescript +import { Logger } from '@aws-lambda-powertools/logger'; +import { Tracer } from '@aws-lambda-powertools/tracer'; +import { Metrics, MetricUnits } from '@aws-lambda-powertools/metrics'; + +const logger = new Logger({ serviceName: 'OrderService' }); +const tracer = new Tracer({ serviceName: 'OrderService' }); +const metrics = new Metrics({ namespace: 'MyApp', serviceName: 'OrderService' }); + +export const handler = async (event: any, context: Context) => { + // Automatically adds trace context to logs + logger.addContext(context); + + logger.info('Processing order', { orderId: event.orderId }); + + // Add trace annotations + tracer.putAnnotation('orderId', event.orderId); + + // Add metrics + metrics.addMetric('Invocation', MetricUnits.Count, 1); + + const startTime = Date.now(); + + try { + const result = await processOrder(event); + + metrics.addMetric('Success', MetricUnits.Count, 1); + metrics.addMetric('Duration', MetricUnits.Milliseconds, Date.now() - startTime); + + logger.info('Order processed', { orderId: event.orderId }); + + return result; + } catch (error) { + metrics.addMetric('Error', MetricUnits.Count, 1); + logger.error('Processing failed', { orderId: event.orderId, error }); + throw error; + } finally { + metrics.publishStoredMetrics(); + } +}; +``` + +## Alerting + +### Effective Alerting Strategy + +**Alert on what matters**: +- **Critical**: Customer-impacting issues (errors, high latency) +- **Warning**: Approaching thresholds (80% capacity) +- **Info**: Trends and anomalies (cost spikes) + +**Alarm fatigue prevention**: +- Tune thresholds based on actual patterns +- Use composite alarms to reduce noise +- Set appropriate evaluation periods +- Include clear remediation steps + +### CloudWatch Alarms + +**Common alarm patterns**: + +```typescript +// Error rate alarm +new cloudwatch.Alarm(this, 'ErrorRateAlarm', { + metric: new cloudwatch.MathExpression({ + expression: 'errors / invocations * 100', + usingMetrics: { + errors: fn.metricErrors({ statistic: 'Sum' }), + invocations: fn.metricInvocations({ statistic: 'Sum' }), + }, + }), + threshold: 1, // 1% error rate + evaluationPeriods: 2, + alarmDescription: 'Error rate exceeded 1%', +}); + +// Latency alarm (p99) +new cloudwatch.Alarm(this, 'LatencyAlarm', { + metric: fn.metricDuration({ + statistic: 'p99', + period: Duration.minutes(5), + }), + threshold: 1000, // 1 second + evaluationPeriods: 2, + alarmDescription: 'p99 latency exceeded 1 second', +}); + +// Concurrent executions approaching limit +new cloudwatch.Alarm(this, 'ConcurrencyAlarm', { + metric: fn.metricConcurrentExecutions({ + statistic: 'Maximum', + }), + threshold: 800, // 80% of 1000 default limit + evaluationPeriods: 1, + alarmDescription: 'Approaching concurrency limit', +}); +``` + +### Composite Alarms + +**Reduce alert noise**: + +```typescript +const errorAlarm = new cloudwatch.Alarm(this, 'Errors', { + metric: fn.metricErrors(), + threshold: 10, + evaluationPeriods: 1, +}); + +const throttleAlarm = new cloudwatch.Alarm(this, 'Throttles', { + metric: fn.metricThrottles(), + threshold: 5, + evaluationPeriods: 1, +}); + +const latencyAlarm = new cloudwatch.Alarm(this, 'Latency', { + metric: fn.metricDuration({ statistic: 'p99' }), + threshold: 2000, + evaluationPeriods: 2, +}); + +// Composite alarm (any of the above) +new cloudwatch.CompositeAlarm(this, 'ServiceHealthAlarm', { + compositeAlarmName: 'order-service-health', + alarmRule: cloudwatch.AlarmRule.anyOf( + errorAlarm, + throttleAlarm, + latencyAlarm + ), + alarmDescription: 'Overall service health degraded', +}); +``` + +## Dashboard Best Practices + +### Service Dashboard Layout + +**Recommended sections**: + +1. **Overview**: + - Total invocations + - Error rate percentage + - P50, P95, P99 latency + - Availability percentage + +2. **Resource Utilization**: + - Concurrent executions + - Memory utilization + - Duration distribution + - Throttles + +3. **Business Metrics**: + - Orders processed + - Revenue per minute + - Customer activity + - Feature usage + +4. **Errors and Alerts**: + - Error count by type + - Active alarms + - DLQ message count + - Failed transactions + +### CloudWatch Dashboard CDK + +```typescript +const dashboard = new cloudwatch.Dashboard(this, 'ServiceDashboard', { + dashboardName: 'order-service', +}); + +dashboard.addWidgets( + // Row 1: Overview + new cloudwatch.GraphWidget({ + title: 'Invocations', + left: [fn.metricInvocations()], + }), + new cloudwatch.SingleValueWidget({ + title: 'Error Rate', + metrics: [ + new cloudwatch.MathExpression({ + expression: 'errors / invocations * 100', + usingMetrics: { + errors: fn.metricErrors({ statistic: 'Sum' }), + invocations: fn.metricInvocations({ statistic: 'Sum' }), + }, + }), + ], + }), + new cloudwatch.GraphWidget({ + title: 'Latency (p50, p95, p99)', + left: [ + fn.metricDuration({ statistic: 'p50', label: 'p50' }), + fn.metricDuration({ statistic: 'p95', label: 'p95' }), + fn.metricDuration({ statistic: 'p99', label: 'p99' }), + ], + }) +); + +// Row 2: Errors +dashboard.addWidgets( + new cloudwatch.LogQueryWidget({ + title: 'Recent Errors', + logGroupNames: [fn.logGroup.logGroupName], + queryLines: [ + 'fields @timestamp, @message', + 'filter level = "ERROR"', + 'sort @timestamp desc', + 'limit 20', + ], + }) +); +``` + +## Monitoring Serverless Architectures + +### End-to-End Monitoring + +**Monitor the entire flow**: + +``` +API Gateway → Lambda → DynamoDB → EventBridge → Lambda + ↓ ↓ ↓ ↓ ↓ + Metrics Traces Metrics Metrics Logs +``` + +**Key metrics per service**: + +| Service | Key Metrics | +|---------|-------------| +| API Gateway | Count, 4XXError, 5XXError, Latency, CacheHitCount | +| Lambda | Invocations, Errors, Duration, Throttles, ConcurrentExecutions | +| DynamoDB | ConsumedReadCapacity, ConsumedWriteCapacity, UserErrors, SystemErrors | +| SQS | NumberOfMessagesSent, NumberOfMessagesReceived, ApproximateAgeOfOldestMessage | +| EventBridge | Invocations, FailedInvocations, TriggeredRules | +| Step Functions | ExecutionsStarted, ExecutionsFailed, ExecutionTime | + +### Synthetic Monitoring + +**Use CloudWatch Synthetics for API monitoring**: + +```typescript +import { Canary, Test, Code, Schedule } from '@aws-cdk/aws-synthetics-alpha'; + +new Canary(this, 'ApiCanary', { + canaryName: 'api-health-check', + schedule: Schedule.rate(Duration.minutes(5)), + test: Test.custom({ + code: Code.fromInline(` + const synthetics = require('Synthetics'); + + const apiCanaryBlueprint = async function () { + const response = await synthetics.executeHttpStep('Verify API', { + url: 'https://api.example.com/health', + method: 'GET', + }); + + return response.statusCode === 200 ? 'success' : 'failure'; + }; + + exports.handler = async () => { + return await apiCanaryBlueprint(); + }; + `), + handler: 'index.handler', + }), + runtime: synthetics.Runtime.SYNTHETICS_NODEJS_PUPPETEER_6_2, +}); +``` + +## OpenTelemetry Integration + +### Amazon Distro for OpenTelemetry (ADOT) + +**Use ADOT for vendor-neutral observability**: + +```typescript +// Lambda Layer with ADOT +const adotLayer = lambda.LayerVersion.fromLayerVersionArn( + this, + 'AdotLayer', + `arn:aws:lambda:${this.region}:901920570463:layer:aws-otel-nodejs-amd64-ver-1-18-1:4` +); + +new NodejsFunction(this, 'Function', { + entry: 'src/handler.ts', + layers: [adotLayer], + tracing: lambda.Tracing.ACTIVE, + environment: { + AWS_LAMBDA_EXEC_WRAPPER: '/opt/otel-handler', + OPENTELEMETRY_COLLECTOR_CONFIG_FILE: '/var/task/collector.yaml', + }, +}); +``` + +**Benefits of ADOT**: +- Vendor-neutral (works with Datadog, New Relic, Honeycomb, etc.) +- Automatic instrumentation +- Consistent format across services +- Export to multiple backends + +## Best Practices Summary + +### Metrics +- ✅ Use CloudWatch Embedded Metric Format (EMF) +- ✅ Track business metrics, not just technical metrics +- ✅ Set alarms on error rate, latency, and throughput +- ✅ Use p99 for latency, not average +- ✅ Create dashboards for key services + +### Logging +- ✅ Use structured JSON logging +- ✅ Include correlation IDs (request ID, trace ID) +- ✅ Use appropriate log levels +- ✅ Never log sensitive data (PII, secrets) +- ✅ Use CloudWatch Logs Insights for analysis + +### Tracing +- ✅ Enable X-Ray tracing on all services +- ✅ Instrument AWS SDK calls +- ✅ Add custom annotations for business context +- ✅ Use service map to understand dependencies +- ✅ Correlate traces with logs and metrics + +### Alerting +- ✅ Alert on customer-impacting issues +- ✅ Tune thresholds to reduce false positives +- ✅ Use composite alarms to reduce noise +- ✅ Include clear remediation steps +- ✅ Escalate critical alarms appropriately + +### Tools +- ✅ Use Lambda Powertools for unified observability +- ✅ Use CloudWatch ServiceLens for service view +- ✅ Use Synthetics for proactive monitoring +- ✅ Consider ADOT for vendor-neutral observability diff --git a/skills/aws-serverless-eda/references/performance-optimization.md b/skills/aws-serverless-eda/references/performance-optimization.md new file mode 100644 index 0000000..3021f54 --- /dev/null +++ b/skills/aws-serverless-eda/references/performance-optimization.md @@ -0,0 +1,671 @@ +# Serverless Performance Optimization + +Performance optimization best practices for AWS Lambda and serverless architectures. + +## Table of Contents + +- [Lambda Execution Lifecycle](#lambda-execution-lifecycle) +- [Cold Start Optimization](#cold-start-optimization) +- [Memory and CPU Optimization](#memory-and-cpu-optimization) +- [Package Size Optimization](#package-size-optimization) +- [Initialization Optimization](#initialization-optimization) +- [Runtime Performance](#runtime-performance) + +## Lambda Execution Lifecycle + +### Execution Environment Phases + +**Three phases of Lambda execution**: + +1. **Init Phase** (Cold Start): + - Download and unpack function package + - Create execution environment + - Initialize runtime + - Execute initialization code (outside handler) + +2. **Invoke Phase**: + - Execute handler code + - Return response + - Freeze execution environment + +3. **Shutdown Phase**: + - Runtime shutdown (after period of inactivity) + - Execution environment destroyed + +### Concurrency and Scaling + +**Key concepts**: +- **Concurrency**: Number of execution environments serving requests simultaneously +- **One event per environment**: Each environment processes one event at a time +- **Automatic scaling**: Lambda creates new environments as needed +- **Environment reuse**: Warm starts reuse existing environments + +**Example**: +- Function takes 100ms to execute +- Single environment can handle 10 requests/second +- 100 concurrent requests = 10 environments needed +- Default account limit: 1,000 concurrent executions (can be raised) + +## Cold Start Optimization + +### Understanding Cold Starts + +**Cold start components**: +``` +Total Cold Start = Download Package + Init Environment + Init Code + Handler +``` + +**Cold start frequency**: +- Development: Every code change creates new environments (frequent) +- Production: Typically < 1% of invocations +- Optimize for p95/p99 latency, not average + +### Package Size Optimization + +**Minimize deployment package**: + +```typescript +new NodejsFunction(this, 'Function', { + entry: 'src/handler.ts', + bundling: { + minify: true, // Minify production code + sourceMap: false, // Disable in production + externalModules: [ + '@aws-sdk/*', // Use AWS SDK from runtime + ], + // Tree-shaking removes unused code + }, +}); +``` + +**Tools for optimization**: +- **esbuild**: Automatic tree-shaking and minification +- **Webpack**: Bundle optimization +- **Maven**: Dependency analysis +- **Gradle**: Unused dependency detection + +**Best practices**: +1. Avoid monolithic functions +2. Bundle only required dependencies +3. Use tree-shaking to remove unused code +4. Minify production code +5. Exclude AWS SDK (provided by runtime) + +### Provisioned Concurrency + +**Pre-initialize environments for predictable latency**: + +```typescript +const fn = new NodejsFunction(this, 'Function', { + entry: 'src/handler.ts', +}); + +// Static provisioned concurrency +fn.currentVersion.addAlias('live', { + provisionedConcurrentExecutions: 10, +}); + +// Auto-scaling provisioned concurrency +const alias = fn.currentVersion.addAlias('prod'); + +const target = new applicationautoscaling.ScalableTarget(this, 'ScalableTarget', { + serviceNamespace: applicationautoscaling.ServiceNamespace.LAMBDA, + maxCapacity: 100, + minCapacity: 10, + resourceId: `function:${fn.functionName}:${alias.aliasName}`, + scalableDimension: 'lambda:function:ProvisionedConcurrentExecutions', +}); + +target.scaleOnUtilization({ + utilizationTarget: 0.7, // 70% utilization +}); +``` + +**When to use**: +- **Consistent traffic patterns**: Predictable load +- **Latency-sensitive APIs**: Sub-100ms requirements +- **Cost consideration**: Compare cold start frequency vs. provisioned cost + +**Cost comparison**: +- **On-demand**: Pay only for actual usage +- **Provisioned**: Pay for provisioned capacity + invocations +- **Breakeven**: When cold starts > ~20% of invocations + +### Lambda SnapStart (Java) + +**Instant cold starts for Java**: + +```typescript +new lambda.Function(this, 'JavaFunction', { + runtime: lambda.Runtime.JAVA_17, + code: lambda.Code.fromAsset('target/function.jar'), + handler: 'com.example.Handler::handleRequest', + snapStart: lambda.SnapStartConf.ON_PUBLISHED_VERSIONS, +}); +``` + +**Benefits**: +- Up to 10x faster cold starts for Java +- No code changes required +- Works with published versions +- No additional cost + +## Memory and CPU Optimization + +### Memory = CPU Allocation + +**Key principle**: Memory and CPU are proportionally allocated + +| Memory | vCPU | +|--------|------| +| 128 MB | 0.07 vCPU | +| 512 MB | 0.28 vCPU | +| 1,024 MB | 0.57 vCPU | +| 1,769 MB | 1.00 vCPU | +| 3,538 MB | 2.00 vCPU | +| 10,240 MB | 6.00 vCPU | + +### Cost vs. Performance Balancing + +**Example - Compute-intensive function**: + +| Memory | Duration | Cost | +|--------|----------|------| +| 128 MB | 11.72s | $0.0246 | +| 256 MB | 6.68s | $0.0280 | +| 512 MB | 3.19s | $0.0268 | +| 1024 MB | 1.46s | $0.0246 | + +**Key insight**: More memory = faster execution = similar or lower cost + +**Formula**: +``` +Duration = Allocated Memory (GB) × Execution Time (seconds) +Cost = Duration × Number of Invocations × Price per GB-second +``` + +### Finding Optimal Memory + +**Use Lambda Power Tuning**: + +```bash +# Deploy power tuning state machine +sam deploy --template-file template.yml --stack-name lambda-power-tuning + +# Run power tuning +aws lambda invoke \ + --function-name powerTuningFunction \ + --payload '{"lambdaARN": "arn:aws:lambda:...", "powerValues": [128, 256, 512, 1024, 1536, 3008]}' \ + response.json +``` + +**Manual testing approach**: +1. Test function at different memory levels +2. Measure execution time at each level +3. Calculate cost for each configuration +4. Choose optimal balance for your use case + +### Multi-Core Optimization + +**Leverage multiple vCPUs** (at 1,769 MB+): + +```typescript +// Use Worker Threads for parallel processing +import { Worker } from 'worker_threads'; + +export const handler = async (event: any) => { + const items = event.items; + + // Process in parallel using multiple cores + const workers = items.map(item => + new Promise((resolve, reject) => { + const worker = new Worker('./worker.js', { + workerData: item, + }); + + worker.on('message', resolve); + worker.on('error', reject); + }) + ); + + const results = await Promise.all(workers); + return results; +}; +``` + +**Python multiprocessing**: + +```python +import multiprocessing as mp + +def handler(event, context): + items = event['items'] + + # Use multiple cores for CPU-bound work + with mp.Pool(mp.cpu_count()) as pool: + results = pool.map(process_item, items) + + return {'results': results} +``` + +## Initialization Optimization + +### Code Outside Handler + +**Initialize once, reuse across invocations**: + +```typescript +// ✅ GOOD - Initialize outside handler +import { DynamoDBClient } from '@aws-sdk/client-dynamodb'; +import { S3Client } from '@aws-sdk/client-s3'; + +// Initialized once per execution environment +const dynamodb = new DynamoDBClient({}); +const s3 = new S3Client({}); + +// Connection pool initialized once +const pool = createConnectionPool({ + host: process.env.DB_HOST, + max: 1, // One connection per execution environment +}); + +export const handler = async (event: any) => { + // Reuse connections across invocations + const data = await dynamodb.getItem({ /* ... */ }); + const file = await s3.getObject({ /* ... */ }); + return processData(data, file); +}; + +// ❌ BAD - Initialize in handler +export const handler = async (event: any) => { + const dynamodb = new DynamoDBClient({}); // Created every invocation + const s3 = new S3Client({}); // Created every invocation + // ... +}; +``` + +### Lazy Loading + +**Load dependencies only when needed**: + +```typescript +// ✅ GOOD - Conditional loading +export const handler = async (event: any) => { + if (event.operation === 'generatePDF') { + // Load heavy PDF library only when needed + const pdfLib = await import('./pdf-generator'); + return pdfLib.generatePDF(event.data); + } + + if (event.operation === 'processImage') { + const sharp = await import('sharp'); + return processImage(sharp, event.data); + } + + // Default operation (no heavy dependencies) + return processDefault(event); +}; + +// ❌ BAD - Load everything upfront +import pdfLib from './pdf-generator'; // 50MB +import sharp from 'sharp'; // 20MB +// Even if not used! + +export const handler = async (event: any) => { + if (event.operation === 'generatePDF') { + return pdfLib.generatePDF(event.data); + } +}; +``` + +### Connection Reuse + +**Enable connection reuse**: + +```typescript +import { DynamoDBClient } from '@aws-sdk/client-dynamodb'; + +const client = new DynamoDBClient({ + // Enable keep-alive for connection reuse + requestHandler: { + connectionTimeout: 3000, + socketTimeout: 3000, + }, +}); + +// For Node.js AWS SDK +process.env.AWS_NODEJS_CONNECTION_REUSE_ENABLED = '1'; +``` + +## Runtime Performance + +### Choose the Right Runtime + +**Runtime comparison**: + +| Runtime | Cold Start | Execution Speed | Ecosystem | Best For | +|---------|------------|-----------------|-----------|----------| +| Node.js 20 | Fast | Fast | Excellent | APIs, I/O-bound | +| Python 3.12 | Fast | Medium | Excellent | Data processing | +| Java 17 + SnapStart | Fast (w/SnapStart) | Fast | Good | Enterprise apps | +| .NET 8 | Medium | Fast | Good | Enterprise apps | +| Go | Very Fast | Very Fast | Good | High performance | +| Rust | Very Fast | Very Fast | Growing | High performance | + +### Optimize Handler Code + +**Efficient code patterns**: + +```typescript +// ✅ GOOD - Batch operations +const items = ['item1', 'item2', 'item3']; + +// Single batch write +await dynamodb.batchWriteItem({ + RequestItems: { + [tableName]: items.map(item => ({ + PutRequest: { Item: item }, + })), + }, +}); + +// ❌ BAD - Multiple single operations +for (const item of items) { + await dynamodb.putItem({ + TableName: tableName, + Item: item, + }); // Slow, multiple round trips +} +``` + +### Async Processing + +**Use async/await effectively**: + +```typescript +// ✅ GOOD - Parallel async operations +const [userData, orderData, inventoryData] = await Promise.all([ + getUserData(userId), + getOrderData(orderId), + getInventoryData(productId), +]); + +// ❌ BAD - Sequential async operations +const userData = await getUserData(userId); +const orderData = await getOrderData(orderId); // Waits unnecessarily +const inventoryData = await getInventoryData(productId); // Waits unnecessarily +``` + +### Caching Strategies + +**Cache frequently accessed data**: + +```typescript +// In-memory cache (persists in warm environments) +const cache = new Map(); + +export const handler = async (event: any) => { + const key = event.key; + + // Check cache first + if (cache.has(key)) { + console.log('Cache hit'); + return cache.get(key); + } + + // Fetch from database + const data = await fetchFromDatabase(key); + + // Store in cache + cache.set(key, data); + + return data; +}; +``` + +**ElastiCache for shared cache**: + +```typescript +import Redis from 'ioredis'; + +// Initialize once +const redis = new Redis({ + host: process.env.REDIS_HOST, + port: 6379, + lazyConnect: true, + enableOfflineQueue: false, +}); + +export const handler = async (event: any) => { + const key = `order:${event.orderId}`; + + // Try cache + const cached = await redis.get(key); + if (cached) { + return JSON.parse(cached); + } + + // Fetch and cache + const data = await fetchOrder(event.orderId); + await redis.setex(key, 300, JSON.stringify(data)); // 5 min TTL + + return data; +}; +``` + +## Performance Testing + +### Load Testing + +**Use Artillery for load testing**: + +```yaml +# load-test.yml +config: + target: https://api.example.com + phases: + - duration: 60 + arrivalRate: 10 + rampTo: 100 # Ramp from 10 to 100 req/sec +scenarios: + - flow: + - post: + url: /orders + json: + orderId: "{{ $randomString() }}" + amount: "{{ $randomNumber(10, 1000) }}" +``` + +```bash +artillery run load-test.yml +``` + +### Benchmarking + +**Test different configurations**: + +```typescript +// benchmark.ts +import { Lambda } from '@aws-sdk/client-lambda'; + +const lambda = new Lambda({}); + +const testConfigurations = [ + { memory: 128, name: 'Function-128' }, + { memory: 256, name: 'Function-256' }, + { memory: 512, name: 'Function-512' }, + { memory: 1024, name: 'Function-1024' }, +]; + +for (const config of testConfigurations) { + const times: number[] = []; + + // Warm up + for (let i = 0; i < 5; i++) { + await lambda.invoke({ FunctionName: config.name }); + } + + // Measure + for (let i = 0; i < 100; i++) { + const start = Date.now(); + await lambda.invoke({ FunctionName: config.name }); + times.push(Date.now() - start); + } + + const p99 = times.sort()[99]; + const avg = times.reduce((a, b) => a + b) / times.length; + + console.log(`${config.memory}MB - Avg: ${avg}ms, p99: ${p99}ms`); +} +``` + +## Cost Optimization + +### Right-Sizing Memory + +**Balance cost and performance**: + +**CPU-bound workloads**: +- More memory = more CPU = faster execution +- Often results in lower cost overall +- Test at 1769MB (1 vCPU) and above + +**I/O-bound workloads**: +- Less sensitive to memory allocation +- May not benefit from higher memory +- Test at lower memory levels (256-512MB) + +**Simple operations**: +- Minimal CPU required +- Use minimum memory (128-256MB) +- Fast execution despite low resources + +### Billing Granularity + +**Lambda bills in 1ms increments**: +- Precise billing (7ms execution = 7ms cost) +- Optimize even small improvements +- Consider trade-offs carefully + +**Cost calculation**: +``` +Cost = (Memory GB) × (Duration seconds) × (Invocations) × ($0.0000166667/GB-second) + + (Invocations) × ($0.20/1M requests) +``` + +### Cost Reduction Strategies + +1. **Optimize execution time**: Faster = cheaper +2. **Right-size memory**: Balance CPU needs with cost +3. **Reduce invocations**: Batch processing, caching +4. **Use Graviton2**: 20% better price/performance +5. **Reserved Concurrency**: Only when needed +6. **Compression**: Reduce data transfer costs + +## Advanced Optimization + +### Lambda Extensions + +**Use extensions for cross-cutting concerns**: + +```typescript +// Lambda layer with extension +const extensionLayer = lambda.LayerVersion.fromLayerVersionArn( + this, + 'Extension', + 'arn:aws:lambda:us-east-1:123456789:layer:my-extension:1' +); + +new NodejsFunction(this, 'Function', { + entry: 'src/handler.ts', + layers: [extensionLayer], +}); +``` + +**Common extensions**: +- Secrets caching +- Configuration caching +- Custom logging +- Security scanning +- Performance monitoring + +### Graviton2 Architecture + +**20% better price/performance**: + +```typescript +new NodejsFunction(this, 'Function', { + entry: 'src/handler.ts', + architecture: lambda.Architecture.ARM_64, // Graviton2 +}); +``` + +**Considerations**: +- Most runtimes support ARM64 +- Test thoroughly before migrating +- Dependencies must support ARM64 +- Native extensions may need recompilation + +### VPC Optimization + +**Hyperplane ENIs** (automatic since 2019): +- No ENI per function +- Faster cold starts in VPC +- Scales instantly + +```typescript +// Modern VPC configuration (fast) +new NodejsFunction(this, 'VpcFunction', { + entry: 'src/handler.ts', + vpc, + vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS }, + // Fast scaling, no ENI limitations +}); +``` + +## Performance Monitoring + +### Key Metrics + +**Monitor these metrics**: +- **Duration**: p50, p95, p99, max +- **Cold Start %**: ColdStartDuration / TotalDuration +- **Error Rate**: Errors / Invocations +- **Throttles**: Indicates concurrency limit reached +- **Iterator Age**: For stream processing lag + +### Performance Dashboards + +```typescript +const dashboard = new cloudwatch.Dashboard(this, 'PerformanceDashboard'); + +dashboard.addWidgets( + new cloudwatch.GraphWidget({ + title: 'Latency Distribution', + left: [ + fn.metricDuration({ statistic: 'p50', label: 'p50' }), + fn.metricDuration({ statistic: 'p95', label: 'p95' }), + fn.metricDuration({ statistic: 'p99', label: 'p99' }), + fn.metricDuration({ statistic: 'Maximum', label: 'max' }), + ], + }), + new cloudwatch.GraphWidget({ + title: 'Memory Utilization', + left: [fn.metricDuration()], + right: [fn.metricErrors()], + }) +); +``` + +## Summary + +- **Cold Starts**: Optimize package size, use provisioned concurrency for critical paths +- **Memory**: More memory often = faster execution = lower cost +- **Initialization**: Initialize connections outside handler +- **Lazy Loading**: Load dependencies only when needed +- **Connection Reuse**: Enable for AWS SDK clients +- **Testing**: Test at different memory levels to find optimal configuration +- **Monitoring**: Track p99 latency, not average +- **Graviton2**: Consider ARM64 for better price/performance +- **Batch Operations**: Reduce round trips to services +- **Caching**: Cache frequently accessed data diff --git a/skills/aws-serverless-eda/references/security-best-practices.md b/skills/aws-serverless-eda/references/security-best-practices.md new file mode 100644 index 0000000..2853130 --- /dev/null +++ b/skills/aws-serverless-eda/references/security-best-practices.md @@ -0,0 +1,625 @@ +# Serverless Security Best Practices + +Security best practices for serverless applications based on AWS Well-Architected Framework. + +## Table of Contents + +- [Shared Responsibility Model](#shared-responsibility-model) +- [Identity and Access Management](#identity-and-access-management) +- [Function Security](#function-security) +- [API Security](#api-security) +- [Data Protection](#data-protection) +- [Network Security](#network-security) + +## Shared Responsibility Model + +### Serverless Shifts Responsibility to AWS + +With serverless, AWS takes on more security responsibilities: + +**AWS Responsibilities**: +- Compute infrastructure +- Execution environment +- Runtime language and patches +- Networking infrastructure +- Server software and OS +- Physical hardware and facilities +- Automatic security patches (like Log4Shell mitigation) + +**Customer Responsibilities**: +- Function code and dependencies +- Resource configuration +- Identity and Access Management (IAM) +- Data encryption (at rest and in transit) +- Application-level security +- Secure coding practices + +### Benefits of Shifted Responsibility + +- **Automatic Patching**: AWS applies security patches automatically (e.g., Log4Shell fixed within 3 days) +- **Infrastructure Security**: No OS patching, server hardening, or vulnerability scanning +- **Operational Agility**: Quick security response at scale +- **Focus on Code**: Spend time on business logic, not infrastructure security + +## Identity and Access Management + +### Least Privilege Principle + +**Always use least privilege IAM policies**: + +```typescript +// ✅ GOOD - Specific grant +const table = new dynamodb.Table(this, 'Table', {}); +const function = new lambda.Function(this, 'Function', {}); + +table.grantReadData(function); // Only read access + +// ❌ BAD - Overly broad +function.addToRolePolicy(new iam.PolicyStatement({ + actions: ['dynamodb:*'], + resources: ['*'], +})); +``` + +### Function Execution Role + +**Separate roles per function**: + +```typescript +// ✅ GOOD - Each function has its own role +const readFunction = new NodejsFunction(this, 'ReadFunction', { + entry: 'src/read.ts', + // Gets its own execution role +}); + +const writeFunction = new NodejsFunction(this, 'WriteFunction', { + entry: 'src/write.ts', + // Gets its own execution role +}); + +table.grantReadData(readFunction); +table.grantReadWriteData(writeFunction); + +// ❌ BAD - Shared role with excessive permissions +const sharedRole = new iam.Role(this, 'SharedRole', { + assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'), + managedPolicies: [ + iam.ManagedPolicy.fromAwsManagedPolicyName('AdministratorAccess'), // Too broad! + ], +}); +``` + +### Resource-Based Policies + +Control who can invoke functions: + +```typescript +// Allow API Gateway to invoke function +myFunction.grantInvoke(new iam.ServicePrincipal('apigateway.amazonaws.com')); + +// Allow specific account +myFunction.addPermission('AllowAccountInvoke', { + principal: new iam.AccountPrincipal('123456789012'), + action: 'lambda:InvokeFunction', +}); + +// Conditional invoke (only from specific VPC endpoint) +myFunction.addPermission('AllowVPCInvoke', { + principal: new iam.ServicePrincipal('lambda.amazonaws.com'), + action: 'lambda:InvokeFunction', + sourceArn: vpcEndpoint.vpcEndpointId, +}); +``` + +### IAM Policies Best Practices + +1. **Use grant methods**: Prefer `.grantXxx()` over manual policies +2. **Condition keys**: Use IAM conditions for fine-grained control +3. **Resource ARNs**: Always specify resource ARNs, avoid wildcards +4. **Session policies**: Use for temporary elevated permissions +5. **Service Control Policies (SCPs)**: Enforce organization-wide guardrails + +## Function Security + +### Lambda Isolation Model + +**Each function runs in isolated sandbox**: +- Built on Firecracker microVMs +- Dedicated execution environment per function +- No shared memory between functions +- Isolated file system and network namespace +- Strong workload isolation + +**Execution Environment Security**: +- One concurrent invocation per environment +- Environment may be reused (warm starts) +- `/tmp` storage persists between invocations +- Sensitive data in memory may persist + +### Secure Coding Practices + +**Handle sensitive data securely**: + +```typescript +// ✅ GOOD - Clean up sensitive data +export const handler = async (event: any) => { + const apiKey = process.env.API_KEY; + + try { + const result = await callApi(apiKey); + return result; + } finally { + // Clear sensitive data from memory + delete process.env.API_KEY; + } +}; + +// ✅ GOOD - Use Secrets Manager +import { SecretsManagerClient, GetSecretValueCommand } from '@aws-sdk/client-secrets-manager'; + +const secretsClient = new SecretsManagerClient({}); + +export const handler = async (event: any) => { + const secret = await secretsClient.send( + new GetSecretValueCommand({ SecretId: process.env.SECRET_ARN }) + ); + + const apiKey = secret.SecretString; + // Use apiKey +}; +``` + +### Dependency Management + +**Scan dependencies for vulnerabilities**: + +```json +// package.json +{ + "scripts": { + "audit": "npm audit", + "audit:fix": "npm audit fix" + }, + "devDependencies": { + "snyk": "^1.0.0" + } +} +``` + +**Keep dependencies updated**: +- Run `npm audit` or `pip-audit` regularly +- Use Dependabot or Snyk for automated scanning +- Update dependencies promptly when vulnerabilities found +- Use minimal dependency sets + +### Environment Variable Security + +**Never store secrets in environment variables**: + +```typescript +// ❌ BAD - Secret in environment variable +new NodejsFunction(this, 'Function', { + environment: { + API_KEY: 'sk-1234567890abcdef', // Never do this! + }, +}); + +// ✅ GOOD - Reference to secret +new NodejsFunction(this, 'Function', { + environment: { + SECRET_ARN: secret.secretArn, + }, +}); + +secret.grantRead(myFunction); +``` + +## API Security + +### API Gateway Security + +**Authentication and Authorization**: + +```typescript +// Cognito User Pool authorizer +const authorizer = new apigateway.CognitoUserPoolsAuthorizer(this, 'Authorizer', { + cognitoUserPools: [userPool], +}); + +api.root.addMethod('GET', integration, { + authorizer, + authorizationType: apigateway.AuthorizationType.COGNITO, +}); + +// Lambda authorizer for custom auth +const customAuthorizer = new apigateway.TokenAuthorizer(this, 'CustomAuth', { + handler: authorizerFunction, + resultsCacheTtl: Duration.minutes(5), +}); + +// IAM authorization for service-to-service +api.root.addMethod('POST', integration, { + authorizationType: apigateway.AuthorizationType.IAM, +}); +``` + +### Request Validation + +**Validate requests at API Gateway**: + +```typescript +const validator = new apigateway.RequestValidator(this, 'Validator', { + api, + validateRequestBody: true, + validateRequestParameters: true, +}); + +const model = api.addModel('Model', { + schema: { + type: apigateway.JsonSchemaType.OBJECT, + required: ['email', 'name'], + properties: { + email: { + type: apigateway.JsonSchemaType.STRING, + format: 'email', + }, + name: { + type: apigateway.JsonSchemaType.STRING, + minLength: 1, + maxLength: 100, + }, + }, + }, +}); + +resource.addMethod('POST', integration, { + requestValidator: validator, + requestModels: { + 'application/json': model, + }, +}); +``` + +### Rate Limiting and Throttling + +```typescript +const api = new apigateway.RestApi(this, 'Api', { + deployOptions: { + throttlingRateLimit: 1000, // requests per second + throttlingBurstLimit: 2000, // burst capacity + }, +}); + +// Per-method throttling +resource.addMethod('POST', integration, { + methodResponses: [{ statusCode: '200' }], + requestParameters: { + 'method.request.header.Authorization': true, + }, + throttling: { + rateLimit: 100, + burstLimit: 200, + }, +}); +``` + +### API Keys and Usage Plans + +```typescript +const apiKey = api.addApiKey('ApiKey', { + apiKeyName: 'customer-key', +}); + +const plan = api.addUsagePlan('UsagePlan', { + name: 'Standard', + throttle: { + rateLimit: 100, + burstLimit: 200, + }, + quota: { + limit: 10000, + period: apigateway.Period.MONTH, + }, +}); + +plan.addApiKey(apiKey); +plan.addApiStage({ + stage: api.deploymentStage, +}); +``` + +## Data Protection + +### Encryption at Rest + +**DynamoDB encryption**: + +```typescript +// Default: AWS-owned CMK (no additional cost) +const table = new dynamodb.Table(this, 'Table', { + encryption: dynamodb.TableEncryption.AWS_MANAGED, // AWS managed CMK +}); + +// Customer-managed CMK (for compliance) +const kmsKey = new kms.Key(this, 'Key', { + enableKeyRotation: true, +}); + +const table = new dynamodb.Table(this, 'Table', { + encryption: dynamodb.TableEncryption.CUSTOMER_MANAGED, + encryptionKey: kmsKey, +}); +``` + +**S3 encryption**: + +```typescript +// SSE-S3 (default, no additional cost) +const bucket = new s3.Bucket(this, 'Bucket', { + encryption: s3.BucketEncryption.S3_MANAGED, +}); + +// SSE-KMS (for fine-grained access control) +const bucket = new s3.Bucket(this, 'Bucket', { + encryption: s3.BucketEncryption.KMS, + encryptionKey: kmsKey, +}); +``` + +**SQS/SNS encryption**: + +```typescript +const queue = new sqs.Queue(this, 'Queue', { + encryption: sqs.QueueEncryption.KMS, + encryptionMasterKey: kmsKey, +}); + +const topic = new sns.Topic(this, 'Topic', { + masterKey: kmsKey, +}); +``` + +### Encryption in Transit + +**All AWS service APIs use TLS**: +- API Gateway endpoints use HTTPS by default +- Lambda to AWS service communication encrypted +- EventBridge, SQS, SNS use TLS +- Custom domains can use ACM certificates + +```typescript +// API Gateway with custom domain +const certificate = new acm.Certificate(this, 'Certificate', { + domainName: 'api.example.com', + validation: acm.CertificateValidation.fromDns(hostedZone), +}); + +const api = new apigateway.RestApi(this, 'Api', { + domainName: { + domainName: 'api.example.com', + certificate, + }, +}); +``` + +### Data Sanitization + +**Validate and sanitize inputs**: + +```typescript +import DOMPurify from 'isomorphic-dompurify'; +import { z } from 'zod'; + +// Schema validation +const OrderSchema = z.object({ + orderId: z.string().uuid(), + amount: z.number().positive(), + email: z.string().email(), +}); + +export const handler = async (event: any) => { + const body = JSON.parse(event.body); + + // Validate schema + const result = OrderSchema.safeParse(body); + if (!result.success) { + return { + statusCode: 400, + body: JSON.stringify({ error: result.error }), + }; + } + + // Sanitize HTML inputs + const sanitized = { + ...result.data, + description: DOMPurify.sanitize(result.data.description), + }; + + await processOrder(sanitized); +}; +``` + +## Network Security + +### VPC Configuration + +**Lambda in VPC for private resources**: + +```typescript +const vpc = new ec2.Vpc(this, 'Vpc', { + maxAzs: 2, + natGateways: 1, +}); + +// Lambda in private subnet +const vpcFunction = new NodejsFunction(this, 'VpcFunction', { + entry: 'src/handler.ts', + vpc, + vpcSubnets: { + subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS, + }, + securityGroups: [securityGroup], +}); + +// Security group for Lambda +const securityGroup = new ec2.SecurityGroup(this, 'LambdaSG', { + vpc, + description: 'Security group for Lambda function', + allowAllOutbound: false, // Restrict outbound +}); + +// Only allow access to RDS +securityGroup.addEgressRule( + ec2.Peer.securityGroupId(rdsSecurityGroup.securityGroupId), + ec2.Port.tcp(3306), + 'Allow MySQL access' +); +``` + +### VPC Endpoints + +**Use VPC endpoints for AWS services**: + +```typescript +// S3 VPC endpoint (gateway endpoint, no cost) +vpc.addGatewayEndpoint('S3Endpoint', { + service: ec2.GatewayVpcEndpointAwsService.S3, +}); + +// DynamoDB VPC endpoint (gateway endpoint, no cost) +vpc.addGatewayEndpoint('DynamoDBEndpoint', { + service: ec2.GatewayVpcEndpointAwsService.DYNAMODB, +}); + +// Secrets Manager VPC endpoint (interface endpoint, cost applies) +vpc.addInterfaceEndpoint('SecretsManagerEndpoint', { + service: ec2.InterfaceVpcEndpointAwsService.SECRETS_MANAGER, + privateDnsEnabled: true, +}); +``` + +### Security Groups + +**Principle of least privilege for network access**: + +```typescript +// Lambda security group +const lambdaSG = new ec2.SecurityGroup(this, 'LambdaSG', { + vpc, + allowAllOutbound: false, +}); + +// RDS security group +const rdsSG = new ec2.SecurityGroup(this, 'RDSSG', { + vpc, + allowAllOutbound: false, +}); + +// Allow Lambda to access RDS only +rdsSG.addIngressRule( + ec2.Peer.securityGroupId(lambdaSG.securityGroupId), + ec2.Port.tcp(3306), + 'Allow Lambda access' +); + +lambdaSG.addEgressRule( + ec2.Peer.securityGroupId(rdsSG.securityGroupId), + ec2.Port.tcp(3306), + 'Allow RDS access' +); +``` + +## Security Monitoring + +### CloudWatch Logs + +**Enable and encrypt logs**: + +```typescript +new NodejsFunction(this, 'Function', { + entry: 'src/handler.ts', + logRetention: logs.RetentionDays.ONE_WEEK, + logGroup: new logs.LogGroup(this, 'LogGroup', { + encryptionKey: kmsKey, // Encrypt logs + retention: logs.RetentionDays.ONE_WEEK, + }), +}); +``` + +### CloudTrail + +**Enable CloudTrail for audit**: + +```typescript +const trail = new cloudtrail.Trail(this, 'Trail', { + isMultiRegionTrail: true, + includeGlobalServiceEvents: true, + managementEvents: cloudtrail.ReadWriteType.ALL, +}); + +// Log Lambda invocations +trail.addLambdaEventSelector([{ + includeManagementEvents: true, + readWriteType: cloudtrail.ReadWriteType.ALL, +}]); +``` + +### GuardDuty + +**Enable GuardDuty for threat detection**: +- Analyzes VPC Flow Logs, DNS logs, CloudTrail events +- Detects unusual API activity +- Identifies compromised credentials +- Monitors for cryptocurrency mining + +## Security Best Practices Checklist + +### Development + +- [ ] Validate and sanitize all inputs +- [ ] Scan dependencies for vulnerabilities +- [ ] Use least privilege IAM permissions +- [ ] Store secrets in Secrets Manager or Parameter Store +- [ ] Never log sensitive data +- [ ] Enable encryption for all data stores +- [ ] Use environment variables for configuration, not secrets + +### Deployment + +- [ ] Enable CloudTrail in all regions +- [ ] Configure VPC for sensitive workloads +- [ ] Use VPC endpoints for AWS service access +- [ ] Enable GuardDuty for threat detection +- [ ] Implement resource-based policies +- [ ] Use AWS WAF for API protection +- [ ] Enable access logging for API Gateway + +### Operations + +- [ ] Monitor CloudTrail for unusual activity +- [ ] Set up alarms for security events +- [ ] Rotate secrets regularly +- [ ] Review IAM policies periodically +- [ ] Audit function permissions +- [ ] Monitor GuardDuty findings +- [ ] Implement automated security responses + +### Testing + +- [ ] Test with least privilege policies +- [ ] Validate error handling for security failures +- [ ] Test input validation and sanitization +- [ ] Verify encryption configurations +- [ ] Test with malicious payloads +- [ ] Audit logs for security events + +## Summary + +- **Shared Responsibility**: AWS handles infrastructure, you handle application security +- **Least Privilege**: Use IAM grant methods, avoid wildcards +- **Encryption**: Enable encryption at rest and in transit +- **Input Validation**: Validate and sanitize all inputs +- **Dependency Security**: Scan and update dependencies regularly +- **Monitoring**: Enable CloudTrail, GuardDuty, and CloudWatch +- **Secrets Management**: Use Secrets Manager, never environment variables +- **Network Security**: Use VPC, security groups, and VPC endpoints appropriately diff --git a/skills/aws-serverless-eda/references/serverless-patterns.md b/skills/aws-serverless-eda/references/serverless-patterns.md new file mode 100644 index 0000000..746c293 --- /dev/null +++ b/skills/aws-serverless-eda/references/serverless-patterns.md @@ -0,0 +1,838 @@ +# Serverless Architecture Patterns + +Comprehensive patterns for building serverless applications on AWS based on Well-Architected Framework principles. + +## Table of Contents + +- [Core Serverless Patterns](#core-serverless-patterns) +- [API Patterns](#api-patterns) +- [Data Processing Patterns](#data-processing-patterns) +- [Integration Patterns](#integration-patterns) +- [Orchestration Patterns](#orchestration-patterns) +- [Anti-Patterns](#anti-patterns) + +## Core Serverless Patterns + +### Pattern: Serverless Microservices + +**Use case**: Independent, scalable services with separate databases + +**Architecture**: +``` +API Gateway → Lambda Functions → DynamoDB/RDS + ↓ (events) + EventBridge → Other Services +``` + +**CDK Implementation**: +```typescript +// User Service +const userTable = new dynamodb.Table(this, 'Users', { + partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING }, + billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, +}); + +const userFunction = new NodejsFunction(this, 'UserHandler', { + entry: 'src/services/users/handler.ts', + environment: { + TABLE_NAME: userTable.tableName, + }, +}); + +userTable.grantReadWriteData(userFunction); + +// Order Service (separate database) +const orderTable = new dynamodb.Table(this, 'Orders', { + partitionKey: { name: 'orderId', type: dynamodb.AttributeType.STRING }, + billingMode: dynamodb.BillingMode.PAY_PER_REQUEST, +}); + +const orderFunction = new NodejsFunction(this, 'OrderHandler', { + entry: 'src/services/orders/handler.ts', + environment: { + TABLE_NAME: orderTable.tableName, + EVENT_BUS: eventBus.eventBusName, + }, +}); + +orderTable.grantReadWriteData(orderFunction); +eventBus.grantPutEventsTo(orderFunction); +``` + +**Benefits**: +- Independent deployment and scaling +- Database per service (data isolation) +- Technology diversity +- Fault isolation + +### Pattern: Serverless API Backend + +**Use case**: REST or GraphQL API with serverless compute + +**REST API with API Gateway**: +```typescript +const api = new apigateway.RestApi(this, 'Api', { + restApiName: 'serverless-api', + deployOptions: { + stageName: 'prod', + tracingEnabled: true, + loggingLevel: apigateway.MethodLoggingLevel.INFO, + dataTraceEnabled: true, + metricsEnabled: true, + }, + defaultCorsPreflightOptions: { + allowOrigins: apigateway.Cors.ALL_ORIGINS, + allowMethods: apigateway.Cors.ALL_METHODS, + }, +}); + +// Resource-based routing +const items = api.root.addResource('items'); +items.addMethod('GET', new apigateway.LambdaIntegration(listFunction)); +items.addMethod('POST', new apigateway.LambdaIntegration(createFunction)); + +const item = items.addResource('{id}'); +item.addMethod('GET', new apigateway.LambdaIntegration(getFunction)); +item.addMethod('PUT', new apigateway.LambdaIntegration(updateFunction)); +item.addMethod('DELETE', new apigateway.LambdaIntegration(deleteFunction)); +``` + +**GraphQL API with AppSync**: +```typescript +const api = new appsync.GraphqlApi(this, 'Api', { + name: 'serverless-graphql-api', + schema: appsync.SchemaFile.fromAsset('schema.graphql'), + authorizationConfig: { + defaultAuthorization: { + authorizationType: appsync.AuthorizationType.API_KEY, + }, + }, + xrayEnabled: true, +}); + +// Lambda resolver +const dataSource = api.addLambdaDataSource('lambda-ds', resolverFunction); + +dataSource.createResolver('QueryGetItem', { + typeName: 'Query', + fieldName: 'getItem', +}); +``` + +### Pattern: Serverless Data Lake + +**Use case**: Ingest, process, and analyze large-scale data + +**Architecture**: +``` +S3 (raw data) → Lambda (transform) → S3 (processed) + ↓ (catalog) + AWS Glue → Athena (query) +``` + +**Implementation**: +```typescript +const rawBucket = new s3.Bucket(this, 'RawData'); +const processedBucket = new s3.Bucket(this, 'ProcessedData'); + +// Trigger Lambda on file upload +rawBucket.addEventNotification( + s3.EventType.OBJECT_CREATED, + new s3n.LambdaDestination(transformFunction), + { prefix: 'incoming/' } +); + +// Transform function +export const transform = async (event: S3Event) => { + for (const record of event.Records) { + const key = record.s3.object.key; + + // Get raw data + const raw = await s3.getObject({ + Bucket: record.s3.bucket.name, + Key: key, + }); + + // Transform data + const transformed = await transformData(raw.Body); + + // Write to processed bucket + await s3.putObject({ + Bucket: process.env.PROCESSED_BUCKET, + Key: `processed/${key}`, + Body: JSON.stringify(transformed), + }); + } +}; +``` + +## API Patterns + +### Pattern: Authorizer Pattern + +**Use case**: Custom authentication and authorization + +```typescript +// Lambda authorizer +const authorizer = new apigateway.TokenAuthorizer(this, 'Authorizer', { + handler: authorizerFunction, + identitySource: 'method.request.header.Authorization', + resultsCacheTtl: Duration.minutes(5), +}); + +// Apply to API methods +const resource = api.root.addResource('protected'); +resource.addMethod('GET', new apigateway.LambdaIntegration(protectedFunction), { + authorizer, +}); +``` + +### Pattern: Request Validation + +**Use case**: Validate requests before Lambda invocation + +```typescript +const requestModel = api.addModel('RequestModel', { + contentType: 'application/json', + schema: { + type: apigateway.JsonSchemaType.OBJECT, + required: ['name', 'email'], + properties: { + name: { type: apigateway.JsonSchemaType.STRING, minLength: 1 }, + email: { type: apigateway.JsonSchemaType.STRING, format: 'email' }, + }, + }, +}); + +resource.addMethod('POST', integration, { + requestValidator: new apigateway.RequestValidator(this, 'Validator', { + api, + validateRequestBody: true, + validateRequestParameters: true, + }), + requestModels: { + 'application/json': requestModel, + }, +}); +``` + +### Pattern: Response Caching + +**Use case**: Reduce backend load and improve latency + +```typescript +const api = new apigateway.RestApi(this, 'Api', { + deployOptions: { + cachingEnabled: true, + cacheTtl: Duration.minutes(5), + cacheClusterEnabled: true, + cacheClusterSize: '0.5', // GB + }, +}); + +// Enable caching per method +resource.addMethod('GET', integration, { + methodResponses: [{ + statusCode: '200', + responseParameters: { + 'method.response.header.Cache-Control': true, + }, + }], +}); +``` + +## Data Processing Patterns + +### Pattern: S3 Event Processing + +**Use case**: Process files uploaded to S3 + +```typescript +const bucket = new s3.Bucket(this, 'DataBucket'); + +// Process images +bucket.addEventNotification( + s3.EventType.OBJECT_CREATED, + new s3n.LambdaDestination(imageProcessingFunction), + { suffix: '.jpg' } +); + +// Process CSV files +bucket.addEventNotification( + s3.EventType.OBJECT_CREATED, + new s3n.LambdaDestination(csvProcessingFunction), + { suffix: '.csv' } +); + +// Large file processing with Step Functions +bucket.addEventNotification( + s3.EventType.OBJECT_CREATED, + new s3n.SfnDestination(processingStateMachine), + { prefix: 'large-files/' } +); +``` + +### Pattern: DynamoDB Streams Processing + +**Use case**: React to database changes + +```typescript +const table = new dynamodb.Table(this, 'Table', { + partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING }, + stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, +}); + +// Process stream changes +new lambda.EventSourceMapping(this, 'StreamConsumer', { + target: streamProcessorFunction, + eventSourceArn: table.tableStreamArn, + startingPosition: lambda.StartingPosition.LATEST, + batchSize: 100, + maxBatchingWindow: Duration.seconds(5), + bisectBatchOnError: true, + retryAttempts: 3, +}); + +// Example: Sync to search index +export const processStream = async (event: DynamoDBStreamEvent) => { + for (const record of event.Records) { + if (record.eventName === 'INSERT' || record.eventName === 'MODIFY') { + const newImage = record.dynamodb?.NewImage; + await elasticSearch.index({ + index: 'items', + id: newImage?.id.S, + body: unmarshall(newImage), + }); + } else if (record.eventName === 'REMOVE') { + await elasticSearch.delete({ + index: 'items', + id: record.dynamodb?.Keys?.id.S, + }); + } + } +}; +``` + +### Pattern: Kinesis Stream Processing + +**Use case**: Real-time data streaming and analytics + +```typescript +const stream = new kinesis.Stream(this, 'EventStream', { + shardCount: 2, + streamMode: kinesis.StreamMode.PROVISIONED, +}); + +// Fan-out with multiple consumers +const consumer1 = new lambda.EventSourceMapping(this, 'Analytics', { + target: analyticsFunction, + eventSourceArn: stream.streamArn, + startingPosition: lambda.StartingPosition.LATEST, + batchSize: 100, + parallelizationFactor: 10, // Process 10 batches per shard in parallel +}); + +const consumer2 = new lambda.EventSourceMapping(this, 'Alerting', { + target: alertingFunction, + eventSourceArn: stream.streamArn, + startingPosition: lambda.StartingPosition.LATEST, + filters: [ + lambda.FilterCriteria.filter({ + eventName: lambda.FilterRule.isEqual('CRITICAL_EVENT'), + }), + ], +}); +``` + +## Integration Patterns + +### Pattern: Service Integration with EventBridge + +**Use case**: Decouple services with events + +```typescript +const eventBus = new events.EventBus(this, 'AppBus'); + +// Service A publishes events +const serviceA = new NodejsFunction(this, 'ServiceA', { + entry: 'src/services/a/handler.ts', + environment: { + EVENT_BUS: eventBus.eventBusName, + }, +}); + +eventBus.grantPutEventsTo(serviceA); + +// Service B subscribes to events +new events.Rule(this, 'ServiceBRule', { + eventBus, + eventPattern: { + source: ['service.a'], + detailType: ['EntityCreated'], + }, + targets: [new targets.LambdaFunction(serviceBFunction)], +}); + +// Service C subscribes to same events +new events.Rule(this, 'ServiceCRule', { + eventBus, + eventPattern: { + source: ['service.a'], + detailType: ['EntityCreated'], + }, + targets: [new targets.LambdaFunction(serviceCFunction)], +}); +``` + +### Pattern: API Gateway + SQS Integration + +**Use case**: Async API requests without Lambda + +```typescript +const queue = new sqs.Queue(this, 'RequestQueue'); + +const api = new apigateway.RestApi(this, 'Api'); + +// Direct SQS integration (no Lambda) +const sqsIntegration = new apigateway.AwsIntegration({ + service: 'sqs', + path: `${process.env.AWS_ACCOUNT_ID}/${queue.queueName}`, + integrationHttpMethod: 'POST', + options: { + credentialsRole: sqsRole, + requestParameters: { + 'integration.request.header.Content-Type': "'application/x-www-form-urlencoded'", + }, + requestTemplates: { + 'application/json': 'Action=SendMessage&MessageBody=$input.body', + }, + integrationResponses: [{ + statusCode: '200', + }], + }, +}); + +api.root.addMethod('POST', sqsIntegration, { + methodResponses: [{ statusCode: '200' }], +}); +``` + +### Pattern: EventBridge + Step Functions + +**Use case**: Event-triggered workflow orchestration + +```typescript +// State machine for order processing +const orderStateMachine = new stepfunctions.StateMachine(this, 'OrderFlow', { + definition: /* ... */, +}); + +// EventBridge triggers state machine +new events.Rule(this, 'OrderPlacedRule', { + eventPattern: { + source: ['orders'], + detailType: ['OrderPlaced'], + }, + targets: [new targets.SfnStateMachine(orderStateMachine)], +}); +``` + +## Orchestration Patterns + +### Pattern: Sequential Workflow + +**Use case**: Multi-step process with dependencies + +```typescript +const definition = new tasks.LambdaInvoke(this, 'Step1', { + lambdaFunction: step1Function, + outputPath: '$.Payload', +}) + .next(new tasks.LambdaInvoke(this, 'Step2', { + lambdaFunction: step2Function, + outputPath: '$.Payload', + })) + .next(new tasks.LambdaInvoke(this, 'Step3', { + lambdaFunction: step3Function, + outputPath: '$.Payload', + })); + +new stepfunctions.StateMachine(this, 'Sequential', { + definition, +}); +``` + +### Pattern: Parallel Execution + +**Use case**: Execute independent tasks concurrently + +```typescript +const parallel = new stepfunctions.Parallel(this, 'ParallelProcessing'); + +parallel.branch(new tasks.LambdaInvoke(this, 'ProcessA', { + lambdaFunction: functionA, +})); + +parallel.branch(new tasks.LambdaInvoke(this, 'ProcessB', { + lambdaFunction: functionB, +})); + +parallel.branch(new tasks.LambdaInvoke(this, 'ProcessC', { + lambdaFunction: functionC, +})); + +const definition = parallel.next(new tasks.LambdaInvoke(this, 'Aggregate', { + lambdaFunction: aggregateFunction, +})); + +new stepfunctions.StateMachine(this, 'Parallel', { definition }); +``` + +### Pattern: Map State (Dynamic Parallelism) + +**Use case**: Process array of items in parallel + +```typescript +const mapState = new stepfunctions.Map(this, 'ProcessItems', { + maxConcurrency: 10, + itemsPath: '$.items', +}); + +mapState.iterator(new tasks.LambdaInvoke(this, 'ProcessItem', { + lambdaFunction: processItemFunction, +})); + +const definition = mapState.next(new tasks.LambdaInvoke(this, 'Finalize', { + lambdaFunction: finalizeFunction, +})); +``` + +### Pattern: Choice State (Conditional Logic) + +**Use case**: Branching logic based on input + +```typescript +const choice = new stepfunctions.Choice(this, 'OrderType'); + +choice.when( + stepfunctions.Condition.stringEquals('$.orderType', 'STANDARD'), + standardProcessing +); + +choice.when( + stepfunctions.Condition.stringEquals('$.orderType', 'EXPRESS'), + expressProcessing +); + +choice.otherwise(defaultProcessing); +``` + +### Pattern: Wait State + +**Use case**: Delay between steps or wait for callbacks + +```typescript +// Fixed delay +const wait = new stepfunctions.Wait(this, 'Wait30Seconds', { + time: stepfunctions.WaitTime.duration(Duration.seconds(30)), +}); + +// Wait until timestamp +const waitUntil = new stepfunctions.Wait(this, 'WaitUntil', { + time: stepfunctions.WaitTime.timestampPath('$.expiryTime'), +}); + +// Wait for callback (.waitForTaskToken) +const waitForCallback = new tasks.LambdaInvoke(this, 'WaitForApproval', { + lambdaFunction: approvalFunction, + integrationPattern: stepfunctions.IntegrationPattern.WAIT_FOR_TASK_TOKEN, + payload: stepfunctions.TaskInput.fromObject({ + token: stepfunctions.JsonPath.taskToken, + data: stepfunctions.JsonPath.entirePayload, + }), +}); +``` + +## Anti-Patterns + +### ❌ Lambda Monolith + +**Problem**: Single Lambda handling all operations + +```typescript +// BAD +export const handler = async (event: any) => { + switch (event.operation) { + case 'createUser': return createUser(event); + case 'getUser': return getUser(event); + case 'updateUser': return updateUser(event); + case 'deleteUser': return deleteUser(event); + case 'createOrder': return createOrder(event); + // ... 20 more operations + } +}; +``` + +**Solution**: Separate Lambda functions per operation + +```typescript +// GOOD - Separate functions +export const createUser = async (event: any) => { /* ... */ }; +export const getUser = async (event: any) => { /* ... */ }; +export const updateUser = async (event: any) => { /* ... */ }; +``` + +### ❌ Recursive Lambda Pattern + +**Problem**: Lambda invoking itself (runaway costs) + +```typescript +// BAD +export const handler = async (event: any) => { + await processItem(event); + + if (hasMoreItems()) { + await lambda.invoke({ + FunctionName: process.env.AWS_LAMBDA_FUNCTION_NAME, + InvocationType: 'Event', + Payload: JSON.stringify({ /* next batch */ }), + }); + } +}; +``` + +**Solution**: Use SQS or Step Functions + +```typescript +// GOOD - Use SQS for iteration +export const handler = async (event: SQSEvent) => { + for (const record of event.Records) { + await processItem(record); + } + // SQS handles iteration automatically +}; +``` + +### ❌ Lambda Chaining + +**Problem**: Lambda directly invoking another Lambda + +```typescript +// BAD +export const handler1 = async (event: any) => { + const result = await processStep1(event); + + // Directly invoking next Lambda + await lambda.invoke({ + FunctionName: 'handler2', + Payload: JSON.stringify(result), + }); +}; +``` + +**Solution**: Use EventBridge, SQS, or Step Functions + +```typescript +// GOOD - Publish to EventBridge +export const handler1 = async (event: any) => { + const result = await processStep1(event); + + await eventBridge.putEvents({ + Entries: [{ + Source: 'service.step1', + DetailType: 'Step1Completed', + Detail: JSON.stringify(result), + }], + }); +}; +``` + +### ❌ Synchronous Waiting in Lambda + +**Problem**: Lambda waiting for slow operations + +```typescript +// BAD - Blocking on slow operation +export const handler = async (event: any) => { + await startBatchJob(); // Returns immediately + + // Wait for job to complete (wastes Lambda time) + while (true) { + const status = await checkJobStatus(); + if (status === 'COMPLETE') break; + await sleep(1000); + } +}; +``` + +**Solution**: Use Step Functions with callback pattern + +```typescript +// GOOD - Step Functions waits, not Lambda +const waitForJob = new tasks.LambdaInvoke(this, 'StartJob', { + lambdaFunction: startJobFunction, + integrationPattern: stepfunctions.IntegrationPattern.WAIT_FOR_TASK_TOKEN, + payload: stepfunctions.TaskInput.fromObject({ + token: stepfunctions.JsonPath.taskToken, + }), +}); +``` + +### ❌ Large Deployment Packages + +**Problem**: Large Lambda packages increase cold start time + +**Solution**: +- Use layers for shared dependencies +- Externalize AWS SDK +- Minimize bundle size + +```typescript +new NodejsFunction(this, 'Function', { + entry: 'src/handler.ts', + bundling: { + minify: true, + externalModules: ['@aws-sdk/*'], // Provided by runtime + nodeModules: ['only-needed-deps'], // Selective bundling + }, +}); +``` + +## Performance Optimization + +### Cold Start Optimization + +**Techniques**: +1. Minimize package size +2. Use provisioned concurrency for critical paths +3. Lazy load dependencies +4. Reuse connections outside handler +5. Use Lambda SnapStart (Java) + +```typescript +// For latency-sensitive APIs +const apiFunction = new NodejsFunction(this, 'ApiFunction', { + entry: 'src/api.ts', + memorySize: 1769, // 1 vCPU for faster initialization +}); + +const alias = apiFunction.currentVersion.addAlias('live'); +alias.addAutoScaling({ + minCapacity: 2, + maxCapacity: 10, +}).scaleOnUtilization({ + utilizationTarget: 0.7, +}); +``` + +### Right-Sizing Memory + +**Test different memory configurations**: + +```typescript +// CPU-bound workload +new NodejsFunction(this, 'ComputeFunction', { + memorySize: 1769, // 1 vCPU + timeout: Duration.seconds(30), +}); + +// I/O-bound workload +new NodejsFunction(this, 'IOFunction', { + memorySize: 512, // Less CPU needed + timeout: Duration.seconds(60), +}); + +// Simple operations +new NodejsFunction(this, 'SimpleFunction', { + memorySize: 256, + timeout: Duration.seconds(10), +}); +``` + +### Concurrent Execution Control + +```typescript +// Protect downstream services +new NodejsFunction(this, 'Function', { + reservedConcurrentExecutions: 10, // Max 10 concurrent +}); + +// Unreserved concurrency (shared pool) +new NodejsFunction(this, 'Function', { + // Uses unreserved account concurrency +}); +``` + +## Testing Strategies + +### Unit Testing + +Test business logic separate from AWS services: + +```typescript +// handler.ts +export const processOrder = async (order: Order): Promise => { + // Business logic (easily testable) + const validated = validateOrder(order); + const priced = calculatePrice(validated); + return transformResult(priced); +}; + +export const handler = async (event: any): Promise => { + const order = parseEvent(event); + const result = await processOrder(order); + await saveToDatabase(result); + return formatResponse(result); +}; + +// handler.test.ts +test('processOrder calculates price correctly', () => { + const order = { items: [{ price: 10, quantity: 2 }] }; + const result = processOrder(order); + expect(result.total).toBe(20); +}); +``` + +### Integration Testing + +Test with actual AWS services: + +```typescript +// integration.test.ts +import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda'; + +test('Lambda processes order correctly', async () => { + const lambda = new LambdaClient({}); + + const response = await lambda.send(new InvokeCommand({ + FunctionName: process.env.FUNCTION_NAME, + Payload: JSON.stringify({ orderId: '123' }), + })); + + const result = JSON.parse(Buffer.from(response.Payload!).toString()); + expect(result.statusCode).toBe(200); +}); +``` + +### Local Testing with SAM + +```bash +# Test API locally +sam local start-api + +# Invoke function locally +sam local invoke MyFunction -e events/test-event.json + +# Generate sample event +sam local generate-event apigateway aws-proxy > event.json +``` + +## Summary + +- **Single Purpose**: One function, one responsibility +- **Concurrent Design**: Think concurrency, not volume +- **Stateless**: Use external storage for state +- **State Machines**: Orchestrate with Step Functions +- **Event-Driven**: Use events over direct calls +- **Idempotent**: Handle failures and duplicates gracefully +- **Observability**: Enable tracing and structured logging