commit 2eb240934ea0d996bcf02b2b1447ab1cae584132 Author: Zhongwei Li Date: Sat Nov 29 17:56:59 2025 +0800 Initial commit diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..71eeee1 --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,15 @@ +{ + "name": "specweave-n8n", + "description": "n8n workflow automation integration with Kafka - Event-driven workflows, Kafka triggers, producers, consumers, and workflow patterns for no-code/low-code event processing", + "version": "0.24.0", + "author": { + "name": "SpecWeave Team", + "url": "https://spec-weave.com" + }, + "skills": [ + "./skills" + ], + "commands": [ + "./commands" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..5bf113f --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# specweave-n8n + +n8n workflow automation integration with Kafka - Event-driven workflows, Kafka triggers, producers, consumers, and workflow patterns for no-code/low-code event processing diff --git a/commands/workflow-template.md b/commands/workflow-template.md new file mode 100644 index 0000000..6997d3e --- /dev/null +++ b/commands/workflow-template.md @@ -0,0 +1,262 @@ +--- +name: specweave-n8n:workflow-template +description: Generate n8n workflow JSON template with Kafka trigger/producer nodes. Creates event-driven workflow patterns (fan-out, retry+DLQ, enrichment, CDC). +--- + +# Generate n8n Workflow Template + +Create ready-to-use n8n workflow JSON files with Kafka integration patterns. + +## What This Command Does + +1. **Select Pattern**: Choose from common event-driven patterns +2. **Configure Kafka**: Specify topics, consumer groups, brokers +3. **Customize Workflow**: Add enrichment, transformations, error handling +4. **Generate JSON**: Export n8n-compatible workflow file +5. **Import Instructions**: How to load into n8n UI + +## Available Patterns + +### Pattern 1: Kafka Trigger → HTTP Enrichment → Kafka Producer +**Use Case**: Event enrichment with external API + +**Workflow**: +``` +[Kafka Trigger] → [HTTP Request] → [Set/Transform] → [Kafka Producer] + ↓ ↓ ↓ + Input topic Enrich data Output topic +``` + +**Configuration**: +- Input topic (e.g., `orders`) +- API endpoint (e.g., `https://api.example.com/customers/{id}`) +- Output topic (e.g., `enriched-orders`) + +### Pattern 2: Kafka Trigger → Fan-Out +**Use Case**: Single event triggers multiple downstream topics + +**Workflow**: +``` +[Kafka Trigger] → [Switch] → [Kafka Producer] (high-priority) + ↓ ↓ + Input └─→ [Kafka Producer] (all-events) + └─→ [Kafka Producer] (analytics) +``` + +### Pattern 3: Retry with Dead Letter Queue +**Use Case**: Fault-tolerant processing with retry logic + +**Workflow**: +``` +[Kafka Trigger] → [Try] → [Process] → [Kafka Producer] (success) + ↓ ↓ + Input [Catch] → [Increment Retry Count] + ↓ + retry < 3 ? + ↓ + [Kafka Producer] (retry-topic) + ↓ + [Kafka Producer] (dlq-topic) +``` + +### Pattern 4: Change Data Capture (CDC) +**Use Case**: Database polling → Kafka events + +**Workflow**: +``` +[Cron: Every 1m] → [PostgreSQL Query] → [Compare] → [Kafka Producer] + ↓ ↓ + Get new rows Detect changes + ↓ + Publish CDC events +``` + +## Example Usage + +```bash +# Generate workflow template +/specweave-n8n:workflow-template + +# I'll ask: +# 1. Which pattern? (Enrichment, Fan-Out, Retry+DLQ, CDC) +# 2. Input topic name? +# 3. Output topic(s)? +# 4. Kafka broker (default: localhost:9092)? +# 5. Consumer group name? + +# Then I'll generate: +# - workflow.json (importable into n8n) +# - README.md with setup instructions +# - .env.example with required variables +``` + +## Generated Files + +**1. workflow.json**: n8n workflow definition +```json +{ + "name": "Kafka Event Enrichment", + "nodes": [ + { + "type": "n8n-nodes-base.kafkaTrigger", + "name": "Kafka Trigger", + "parameters": { + "topic": "orders", + "groupId": "order-processor", + "brokers": "localhost:9092" + } + }, + { + "type": "n8n-nodes-base.httpRequest", + "name": "Enrich Customer Data", + "parameters": { + "url": "https://api.example.com/customers/={{$json.customerId}}", + "authentication": "genericCredentialType" + } + }, + { + "type": "n8n-nodes-base.set", + "name": "Transform", + "parameters": { + "values": { + "orderId": "={{$json.order.id}}", + "customerName": "={{$json.customer.name}}" + } + } + }, + { + "type": "n8n-nodes-base.kafka", + "name": "Kafka Producer", + "parameters": { + "topic": "enriched-orders", + "brokers": "localhost:9092" + } + } + ], + "connections": { ... } +} +``` + +**2. README.md**: Import instructions +```markdown +# Import Workflow into n8n + +1. Open n8n UI (http://localhost:5678) +2. Click "Workflows" → "Import from File" +3. Select workflow.json +4. Configure credentials (Kafka, HTTP API) +5. Activate workflow +6. Test with sample event +``` + +**3. .env.example**: Required environment variables +```bash +KAFKA_BROKERS=localhost:9092 +KAFKA_SASL_USERNAME=your-username +KAFKA_SASL_PASSWORD=your-password +API_ENDPOINT=https://api.example.com +API_TOKEN=your-api-token +``` + +## Import into n8n + +**Via UI**: +1. n8n Dashboard → Workflows → Import from File +2. Select generated workflow.json +3. Configure Kafka credentials +4. Activate workflow + +**Via CLI**: +```bash +# Import workflow +n8n import:workflow --input=workflow.json + +# List workflows +n8n list:workflow +``` + +## Configuration Options + +### Kafka Settings +- **Brokers**: Comma-separated list (e.g., `broker1:9092,broker2:9092`) +- **Consumer Group**: Unique identifier for this workflow +- **Offset**: `earliest` (replay) or `latest` (new messages only) +- **Auto Commit**: `true` (recommended) or `false` (manual) +- **SSL/SASL**: Authentication credentials for secure clusters + +### Error Handling +- **Retry Count**: Maximum retries before DLQ (default: 3) +- **Backoff Strategy**: Exponential (1s, 2s, 4s, 8s) +- **DLQ Topic**: Dead letter queue for failed messages +- **Alert on Failure**: Send Slack/email notification + +### Performance +- **Batch Size**: Process N messages at once (default: 1) +- **Batch Timeout**: Wait up to N ms for batch (default: 5000) +- **Parallel Execution**: Enable for HTTP enrichment (default: disabled) +- **Max Memory**: Limit workflow memory usage + +## Testing + +**Manual Test**: +```bash +# 1. Produce test event +echo '{"orderId": 123, "customerId": 456}' | \ + kcat -P -b localhost:9092 -t orders + +# 2. Check n8n execution log +# n8n UI → Executions → View latest run + +# 3. Consume output +kcat -C -b localhost:9092 -t enriched-orders +``` + +**Automated Test**: +```bash +# Execute workflow via CLI +n8n execute workflow --file workflow.json \ + --input test-data.json + +# Expected output: success status +``` + +## Troubleshooting + +### Issue 1: Workflow Not Triggering +**Solution**: Check Kafka connection +```bash +# Test Kafka connectivity +kcat -L -b localhost:9092 + +# Verify consumer group registered +kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ + --describe --group order-processor +``` + +### Issue 2: Messages Not Being Consumed +**Solution**: Check offset position +- n8n UI → Workflow → Kafka Trigger → Offset +- Set to `earliest` to replay all messages + +### Issue 3: HTTP Enrichment Timeout +**Solution**: Enable parallel processing +- Workflow → HTTP Request → Batching → Enable +- Set batch size: 100 +- Set timeout: 30s + +## Related Commands + +- `/specweave-kafka:dev-env` - Set up local Kafka cluster +- `/specweave-n8n:test-workflow` - Test workflow with sample data (coming soon) + +## Documentation + +- **n8n Kafka Nodes**: https://docs.n8n.io/integrations/builtin/app-nodes/n8n-nodes-base.kafka/ +- **Workflow Patterns**: `.specweave/docs/public/guides/n8n-kafka-patterns.md` +- **Error Handling**: `.specweave/docs/public/guides/n8n-error-handling.md` + +--- + +**Plugin**: specweave-n8n +**Version**: 1.0.0 +**Status**: ✅ Production Ready diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..91c65c2 --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,49 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:anton-abyzov/specweave:plugins/specweave-n8n", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "6a0abdf590e2d6aee37d33770e518aa680f251b6", + "treeHash": "20c50c648828976dbebe8fbd76ead24a7f7e2f469612a81f18de9f92fa2e26e9", + "generatedAt": "2025-11-28T10:13:52.123675Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "specweave-n8n", + "description": "n8n workflow automation integration with Kafka - Event-driven workflows, Kafka triggers, producers, consumers, and workflow patterns for no-code/low-code event processing", + "version": "0.24.0" + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "97361cb00aa03a235b49f7ad350dea06f7453624a127793891413cb5d2035494" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "f4d4c818909ae1d83edbb3f84914260539b6af7f0df90dfb5c02b3a508acc79c" + }, + { + "path": "commands/workflow-template.md", + "sha256": "7807b9e6f5c213225d9c97ed7c534277ce37133f97c33773da38d18f89d37a7e" + }, + { + "path": "skills/n8n-kafka-workflows/SKILL.md", + "sha256": "357b628afa462880a934daa37ec1524b57587fb0f9543e75257903908a85eb2b" + } + ], + "dirSha256": "20c50c648828976dbebe8fbd76ead24a7f7e2f469612a81f18de9f92fa2e26e9" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file diff --git a/skills/n8n-kafka-workflows/SKILL.md b/skills/n8n-kafka-workflows/SKILL.md new file mode 100644 index 0000000..191d4a8 --- /dev/null +++ b/skills/n8n-kafka-workflows/SKILL.md @@ -0,0 +1,504 @@ +--- +name: n8n-kafka-workflows +description: n8n workflow automation with Kafka integration expert. Covers Kafka trigger node, producer node, event-driven workflows, error handling, retries, and no-code/low-code event processing patterns. Activates for n8n kafka, kafka trigger, kafka producer, n8n workflows, event-driven automation, no-code kafka, workflow patterns. +--- + +# n8n Kafka Workflows Skill + +Expert knowledge of integrating Apache Kafka with n8n workflow automation platform for no-code/low-code event-driven processing. + +## What I Know + +### n8n Kafka Nodes + +**Kafka Trigger Node** (Event Consumer): +- Triggers workflow on new Kafka messages +- Supports consumer groups +- Auto-commit or manual offset management +- Multiple topic subscription +- Message batching + +**Kafka Producer Node** (Event Publisher): +- Sends messages to Kafka topics +- Supports key-based partitioning +- Header support +- Compression (gzip, snappy, lz4) +- Batch sending + +**Configuration**: +```json +{ + "credentials": { + "kafkaApi": { + "brokers": "localhost:9092", + "clientId": "n8n-workflow", + "ssl": false, + "sasl": { + "mechanism": "plain", + "username": "{{$env.KAFKA_USER}}", + "password": "{{$env.KAFKA_PASSWORD}}" + } + } + } +} +``` + +## When to Use This Skill + +Activate me when you need help with: +- n8n Kafka setup ("Configure Kafka trigger in n8n") +- Workflow patterns ("Event-driven automation with n8n") +- Error handling ("Retry failed Kafka messages") +- Integration patterns ("Enrich Kafka events with HTTP API") +- Producer configuration ("Send messages to Kafka from n8n") +- Consumer groups ("Process Kafka events in parallel") + +## Common Workflow Patterns + +### Pattern 1: Event-Driven Processing + +**Use Case**: Process Kafka events with HTTP API enrichment + +``` +[Kafka Trigger] → [HTTP Request] → [Transform] → [Database] + ↓ + orders topic + ↓ + Get customer data + ↓ + Merge order + customer + ↓ + Save to PostgreSQL +``` + +**n8n Workflow**: +1. **Kafka Trigger**: + - Topic: `orders` + - Consumer Group: `order-processor` + - Offset: `latest` + +2. **HTTP Request** (Enrich): + - URL: `https://api.example.com/customers/{{$json.customerId}}` + - Method: GET + - Headers: `Authorization: Bearer {{$env.API_TOKEN}}` + +3. **Set Node** (Transform): + ```javascript + return { + orderId: $json.order.id, + customerId: $json.order.customerId, + customerName: $json.customer.name, + customerEmail: $json.customer.email, + total: $json.order.total, + timestamp: new Date().toISOString() + }; + ``` + +4. **PostgreSQL** (Save): + - Operation: INSERT + - Table: `enriched_orders` + - Columns: Mapped from Set node + +### Pattern 2: Fan-Out (Publish to Multiple Topics) + +**Use Case**: Single event triggers multiple downstream workflows + +``` +[Kafka Trigger] → [Switch] → [Kafka Producer] (topic: high-value-orders) + ↓ ↓ + orders topic └─→ [Kafka Producer] (topic: all-orders) + └─→ [Kafka Producer] (topic: analytics) +``` + +**n8n Workflow**: +1. **Kafka Trigger**: Consume `orders` +2. **Switch Node**: Route by `total` value + - Route 1: `total > 1000` → `high-value-orders` topic + - Route 2: Always → `all-orders` topic + - Route 3: Always → `analytics` topic +3. **Kafka Producer** (x3): Send to respective topics + +### Pattern 3: Retry with Dead Letter Queue (DLQ) + +**Use Case**: Retry failed messages, send to DLQ after 3 attempts + +``` +[Kafka Trigger] → [Try/Catch] → [Success] → [Kafka Producer] (topic: processed) + ↓ ↓ + input topic [Catch Error] + ↓ + [Increment Retry Count] + ↓ + [If Retry < 3] + ↓ Yes + [Kafka Producer] (topic: input-retry) + ↓ No + [Kafka Producer] (topic: dlq) +``` + +**n8n Workflow**: +1. **Kafka Trigger**: `input` topic +2. **Try Node**: HTTP Request (may fail) +3. **Catch Node** (Error Handler): + - Get retry count from message headers + - Increment retry count + - If retry < 3: Send to `input-retry` topic + - Else: Send to `dlq` topic + +### Pattern 4: Batch Processing with Aggregation + +**Use Case**: Aggregate 100 events, send batch to API + +``` +[Kafka Trigger] → [Aggregate] → [HTTP Request] → [Kafka Producer] + ↓ ↓ + events topic Buffer 100 msgs + ↓ + Send batch to API + ↓ + Publish results +``` + +**n8n Workflow**: +1. **Kafka Trigger**: Enable batching (100 messages) +2. **Aggregate Node**: Combine into array +3. **HTTP Request**: POST batch +4. **Kafka Producer**: Send results + +### Pattern 5: Change Data Capture (CDC) to Kafka + +**Use Case**: Stream database changes to Kafka + +``` +[Cron Trigger] → [PostgreSQL] → [Compare] → [Kafka Producer] + ↓ ↓ ↓ + Every 1 min Get new rows Find diffs + ↓ + Publish changes +``` + +**n8n Workflow**: +1. **Cron**: Every 1 minute +2. **PostgreSQL**: SELECT new rows (WHERE updated_at > last_run) +3. **Function Node**: Detect changes (INSERT/UPDATE/DELETE) +4. **Kafka Producer**: Send CDC events + +## Best Practices + +### 1. Use Consumer Groups for Parallel Processing + +✅ **DO**: +``` +Workflow Instance 1: + Consumer Group: order-processor + Partition: 0, 1, 2 + +Workflow Instance 2: + Consumer Group: order-processor + Partition: 3, 4, 5 +``` + +❌ **DON'T**: +``` +// WRONG: No consumer group (all instances get all messages!) +Consumer Group: (empty) +``` + +### 2. Handle Errors with Try/Catch + +✅ **DO**: +``` +[Kafka Trigger] + ↓ +[Try] → [HTTP Request] → [Success Handler] + ↓ +[Catch] → [Error Handler] → [Kafka DLQ] +``` + +❌ **DON'T**: +``` +// WRONG: No error handling (workflow crashes on failure!) +[Kafka Trigger] → [HTTP Request] → [Database] +``` + +### 3. Use Environment Variables for Credentials + +✅ **DO**: +``` +Kafka Brokers: {{$env.KAFKA_BROKERS}} +SASL Username: {{$env.KAFKA_USER}} +SASL Password: {{$env.KAFKA_PASSWORD}} +``` + +❌ **DON'T**: +``` +// WRONG: Hardcoded credentials in workflow! +Kafka Brokers: "localhost:9092" +SASL Username: "admin" +SASL Password: "admin-secret" +``` + +### 4. Set Explicit Partitioning Keys + +✅ **DO**: +``` +Kafka Producer: + Topic: orders + Key: {{$json.customerId}} // Partition by customer + Message: {{$json}} +``` + +❌ **DON'T**: +``` +// WRONG: No key (random partitioning!) +Kafka Producer: + Topic: orders + Message: {{$json}} +``` + +### 5. Monitor Consumer Lag + +**Setup Prometheus metrics export**: +``` +[Cron Trigger] → [Kafka Admin] → [Get Consumer Lag] → [Prometheus] + ↓ ↓ ↓ + Every 30s List consumer groups Calculate lag + ↓ + Push to Pushgateway +``` + +## Error Handling Strategies + +### Strategy 1: Exponential Backoff Retry + +```javascript +// Function Node (Calculate Backoff) +const retryCount = $json.headers?.['retry-count'] || 0; +const backoffMs = Math.min(1000 * Math.pow(2, retryCount), 60000); // Max 60 seconds + +return { + retryCount: retryCount + 1, + backoffMs, + nextRetryAt: new Date(Date.now() + backoffMs).toISOString() +}; +``` + +**Workflow**: +1. Try processing +2. On failure: Calculate backoff +3. Wait (using Wait node) +4. Retry (send to retry topic) +5. If max retries reached: Send to DLQ + +### Strategy 2: Circuit Breaker + +```javascript +// Function Node (Check Failure Rate) +const failures = $json.metrics.failures || 0; +const total = $json.metrics.total || 1; +const failureRate = failures / total; + +if (failureRate > 0.5) { + // Circuit open (too many failures) + return { circuitState: 'OPEN', skipProcessing: true }; +} + +return { circuitState: 'CLOSED', skipProcessing: false }; +``` + +**Workflow**: +1. Track success/failure metrics +2. Calculate failure rate +3. If >50% failures: Open circuit (stop processing) +4. Wait 30 seconds +5. Try single request (half-open) +6. If success: Close circuit (resume) + +### Strategy 3: Idempotent Processing + +```javascript +// Function Node (Deduplication) +const messageId = $json.headers?.['message-id']; +const cache = $('Redis').get(messageId); + +if (cache) { + // Already processed, skip + return { skip: true, reason: 'duplicate' }; +} + +// Process and cache +await $('Redis').set(messageId, 'processed', { ttl: 3600 }); +return { skip: false }; +``` + +**Workflow**: +1. Extract message ID +2. Check Redis cache +3. If exists: Skip processing +4. Process message +5. Store message ID in cache (1 hour TTL) + +## Performance Optimization + +### 1. Batch Processing + +**Enable batching in Kafka Trigger**: +``` +Kafka Trigger: + Batch Size: 100 + Batch Timeout: 5000ms // Max wait 5 seconds +``` + +**Process batch**: +```javascript +// Function Node (Batch Transform) +const events = $input.all(); +const transformed = events.map(event => ({ + id: event.json.id, + timestamp: event.json.timestamp, + processed: true +})); + +return transformed; +``` + +### 2. Parallel Processing with Split in Batches + +``` +[Kafka Trigger] → [Split in Batches] → [HTTP Request] → [Aggregate] + ↓ ↓ ↓ + 1000 events 100 at a time Parallel API calls + ↓ + Combine results +``` + +### 3. Use Compression + +**Kafka Producer**: +``` +Compression: lz4 // Or gzip, snappy +Batch Size: 1000 // Larger batches = better compression +``` + +## Integration Patterns + +### Pattern 1: Kafka + HTTP API Enrichment + +``` +[Kafka Trigger] → [HTTP Request] → [Transform] → [Kafka Producer] + ↓ ↓ ↓ + Raw events Enrich from API Combine data + ↓ + Publish enriched +``` + +### Pattern 2: Kafka + Database Sync + +``` +[Kafka Trigger] → [PostgreSQL Upsert] → [Kafka Producer] + ↓ ↓ ↓ + CDC events Update database Publish success/failure +``` + +### Pattern 3: Kafka + Email Notifications + +``` +[Kafka Trigger] → [If Critical] → [Send Email] → [Kafka Producer] + ↓ ↓ ↓ + Alerts severity=critical Notify admin + ↓ + Publish alert sent +``` + +### Pattern 4: Kafka + Slack Alerts + +``` +[Kafka Trigger] → [Transform] → [Slack] → [Kafka Producer] + ↓ ↓ ↓ + Errors Format message Send to #alerts + ↓ + Publish notification +``` + +## Testing n8n Workflows + +### Manual Testing + +1. **Test with Sample Data**: + - Right-click node → "Add Sample Data" + - Execute workflow + - Check outputs + +2. **Test Kafka Producer**: + ```bash + # Consume test topic + kcat -C -b localhost:9092 -t test-output -o beginning + ``` + +3. **Test Kafka Trigger**: + ```bash + # Produce test message + echo '{"test": "data"}' | kcat -P -b localhost:9092 -t test-input + ``` + +### Automated Testing + +**n8n CLI**: +```bash +# Execute workflow with input +n8n execute workflow --file workflow.json --input data.json + +# Export workflow +n8n export:workflow --id=123 --output=workflow.json +``` + +## Common Issues & Solutions + +### Issue 1: Consumer Lag Building Up + +**Symptoms**: Processing slower than message arrival + +**Solutions**: +1. Increase consumer group size (parallel processing) +2. Enable batching (process 100 messages at once) +3. Optimize HTTP requests (use connection pooling) +4. Use Split in Batches for parallel processing + +### Issue 2: Duplicate Messages + +**Cause**: At-least-once delivery, no deduplication + +**Solution**: Add idempotency check: +```javascript +// Check if message already processed +const messageId = $json.headers?.['message-id']; +const exists = await $('Redis').exists(messageId); + +if (exists) { + return { skip: true }; +} +``` + +### Issue 3: Workflow Execution Timeout + +**Cause**: Long-running HTTP requests + +**Solution**: Use async patterns: +``` +[Kafka Trigger] → [Webhook] → [Wait for Webhook] → [Process Response] + ↓ ↓ + Trigger job Async callback + ↓ + Continue workflow +``` + +## References + +- n8n Kafka Trigger: https://docs.n8n.io/integrations/builtin/trigger-nodes/n8n-nodes-base.kafkatrigger/ +- n8n Kafka Producer: https://docs.n8n.io/integrations/builtin/app-nodes/n8n-nodes-base.kafka/ +- n8n Best Practices: https://docs.n8n.io/hosting/scaling/best-practices/ +- Workflow Examples: https://n8n.io/workflows + +--- + +**Invoke me when you need n8n Kafka integration, workflow automation, or event-driven no-code patterns!**