--- name: n8n-kafka-workflows description: n8n workflow automation with Kafka integration expert. Covers Kafka trigger node, producer node, event-driven workflows, error handling, retries, and no-code/low-code event processing patterns. Activates for n8n kafka, kafka trigger, kafka producer, n8n workflows, event-driven automation, no-code kafka, workflow patterns. --- # n8n Kafka Workflows Skill Expert knowledge of integrating Apache Kafka with n8n workflow automation platform for no-code/low-code event-driven processing. ## What I Know ### n8n Kafka Nodes **Kafka Trigger Node** (Event Consumer): - Triggers workflow on new Kafka messages - Supports consumer groups - Auto-commit or manual offset management - Multiple topic subscription - Message batching **Kafka Producer Node** (Event Publisher): - Sends messages to Kafka topics - Supports key-based partitioning - Header support - Compression (gzip, snappy, lz4) - Batch sending **Configuration**: ```json { "credentials": { "kafkaApi": { "brokers": "localhost:9092", "clientId": "n8n-workflow", "ssl": false, "sasl": { "mechanism": "plain", "username": "{{$env.KAFKA_USER}}", "password": "{{$env.KAFKA_PASSWORD}}" } } } } ``` ## When to Use This Skill Activate me when you need help with: - n8n Kafka setup ("Configure Kafka trigger in n8n") - Workflow patterns ("Event-driven automation with n8n") - Error handling ("Retry failed Kafka messages") - Integration patterns ("Enrich Kafka events with HTTP API") - Producer configuration ("Send messages to Kafka from n8n") - Consumer groups ("Process Kafka events in parallel") ## Common Workflow Patterns ### Pattern 1: Event-Driven Processing **Use Case**: Process Kafka events with HTTP API enrichment ``` [Kafka Trigger] → [HTTP Request] → [Transform] → [Database] ↓ orders topic ↓ Get customer data ↓ Merge order + customer ↓ Save to PostgreSQL ``` **n8n Workflow**: 1. **Kafka Trigger**: - Topic: `orders` - Consumer Group: `order-processor` - Offset: `latest` 2. **HTTP Request** (Enrich): - URL: `https://api.example.com/customers/{{$json.customerId}}` - Method: GET - Headers: `Authorization: Bearer {{$env.API_TOKEN}}` 3. **Set Node** (Transform): ```javascript return { orderId: $json.order.id, customerId: $json.order.customerId, customerName: $json.customer.name, customerEmail: $json.customer.email, total: $json.order.total, timestamp: new Date().toISOString() }; ``` 4. **PostgreSQL** (Save): - Operation: INSERT - Table: `enriched_orders` - Columns: Mapped from Set node ### Pattern 2: Fan-Out (Publish to Multiple Topics) **Use Case**: Single event triggers multiple downstream workflows ``` [Kafka Trigger] → [Switch] → [Kafka Producer] (topic: high-value-orders) ↓ ↓ orders topic └─→ [Kafka Producer] (topic: all-orders) └─→ [Kafka Producer] (topic: analytics) ``` **n8n Workflow**: 1. **Kafka Trigger**: Consume `orders` 2. **Switch Node**: Route by `total` value - Route 1: `total > 1000` → `high-value-orders` topic - Route 2: Always → `all-orders` topic - Route 3: Always → `analytics` topic 3. **Kafka Producer** (x3): Send to respective topics ### Pattern 3: Retry with Dead Letter Queue (DLQ) **Use Case**: Retry failed messages, send to DLQ after 3 attempts ``` [Kafka Trigger] → [Try/Catch] → [Success] → [Kafka Producer] (topic: processed) ↓ ↓ input topic [Catch Error] ↓ [Increment Retry Count] ↓ [If Retry < 3] ↓ Yes [Kafka Producer] (topic: input-retry) ↓ No [Kafka Producer] (topic: dlq) ``` **n8n Workflow**: 1. **Kafka Trigger**: `input` topic 2. **Try Node**: HTTP Request (may fail) 3. **Catch Node** (Error Handler): - Get retry count from message headers - Increment retry count - If retry < 3: Send to `input-retry` topic - Else: Send to `dlq` topic ### Pattern 4: Batch Processing with Aggregation **Use Case**: Aggregate 100 events, send batch to API ``` [Kafka Trigger] → [Aggregate] → [HTTP Request] → [Kafka Producer] ↓ ↓ events topic Buffer 100 msgs ↓ Send batch to API ↓ Publish results ``` **n8n Workflow**: 1. **Kafka Trigger**: Enable batching (100 messages) 2. **Aggregate Node**: Combine into array 3. **HTTP Request**: POST batch 4. **Kafka Producer**: Send results ### Pattern 5: Change Data Capture (CDC) to Kafka **Use Case**: Stream database changes to Kafka ``` [Cron Trigger] → [PostgreSQL] → [Compare] → [Kafka Producer] ↓ ↓ ↓ Every 1 min Get new rows Find diffs ↓ Publish changes ``` **n8n Workflow**: 1. **Cron**: Every 1 minute 2. **PostgreSQL**: SELECT new rows (WHERE updated_at > last_run) 3. **Function Node**: Detect changes (INSERT/UPDATE/DELETE) 4. **Kafka Producer**: Send CDC events ## Best Practices ### 1. Use Consumer Groups for Parallel Processing ✅ **DO**: ``` Workflow Instance 1: Consumer Group: order-processor Partition: 0, 1, 2 Workflow Instance 2: Consumer Group: order-processor Partition: 3, 4, 5 ``` ❌ **DON'T**: ``` // WRONG: No consumer group (all instances get all messages!) Consumer Group: (empty) ``` ### 2. Handle Errors with Try/Catch ✅ **DO**: ``` [Kafka Trigger] ↓ [Try] → [HTTP Request] → [Success Handler] ↓ [Catch] → [Error Handler] → [Kafka DLQ] ``` ❌ **DON'T**: ``` // WRONG: No error handling (workflow crashes on failure!) [Kafka Trigger] → [HTTP Request] → [Database] ``` ### 3. Use Environment Variables for Credentials ✅ **DO**: ``` Kafka Brokers: {{$env.KAFKA_BROKERS}} SASL Username: {{$env.KAFKA_USER}} SASL Password: {{$env.KAFKA_PASSWORD}} ``` ❌ **DON'T**: ``` // WRONG: Hardcoded credentials in workflow! Kafka Brokers: "localhost:9092" SASL Username: "admin" SASL Password: "admin-secret" ``` ### 4. Set Explicit Partitioning Keys ✅ **DO**: ``` Kafka Producer: Topic: orders Key: {{$json.customerId}} // Partition by customer Message: {{$json}} ``` ❌ **DON'T**: ``` // WRONG: No key (random partitioning!) Kafka Producer: Topic: orders Message: {{$json}} ``` ### 5. Monitor Consumer Lag **Setup Prometheus metrics export**: ``` [Cron Trigger] → [Kafka Admin] → [Get Consumer Lag] → [Prometheus] ↓ ↓ ↓ Every 30s List consumer groups Calculate lag ↓ Push to Pushgateway ``` ## Error Handling Strategies ### Strategy 1: Exponential Backoff Retry ```javascript // Function Node (Calculate Backoff) const retryCount = $json.headers?.['retry-count'] || 0; const backoffMs = Math.min(1000 * Math.pow(2, retryCount), 60000); // Max 60 seconds return { retryCount: retryCount + 1, backoffMs, nextRetryAt: new Date(Date.now() + backoffMs).toISOString() }; ``` **Workflow**: 1. Try processing 2. On failure: Calculate backoff 3. Wait (using Wait node) 4. Retry (send to retry topic) 5. If max retries reached: Send to DLQ ### Strategy 2: Circuit Breaker ```javascript // Function Node (Check Failure Rate) const failures = $json.metrics.failures || 0; const total = $json.metrics.total || 1; const failureRate = failures / total; if (failureRate > 0.5) { // Circuit open (too many failures) return { circuitState: 'OPEN', skipProcessing: true }; } return { circuitState: 'CLOSED', skipProcessing: false }; ``` **Workflow**: 1. Track success/failure metrics 2. Calculate failure rate 3. If >50% failures: Open circuit (stop processing) 4. Wait 30 seconds 5. Try single request (half-open) 6. If success: Close circuit (resume) ### Strategy 3: Idempotent Processing ```javascript // Function Node (Deduplication) const messageId = $json.headers?.['message-id']; const cache = $('Redis').get(messageId); if (cache) { // Already processed, skip return { skip: true, reason: 'duplicate' }; } // Process and cache await $('Redis').set(messageId, 'processed', { ttl: 3600 }); return { skip: false }; ``` **Workflow**: 1. Extract message ID 2. Check Redis cache 3. If exists: Skip processing 4. Process message 5. Store message ID in cache (1 hour TTL) ## Performance Optimization ### 1. Batch Processing **Enable batching in Kafka Trigger**: ``` Kafka Trigger: Batch Size: 100 Batch Timeout: 5000ms // Max wait 5 seconds ``` **Process batch**: ```javascript // Function Node (Batch Transform) const events = $input.all(); const transformed = events.map(event => ({ id: event.json.id, timestamp: event.json.timestamp, processed: true })); return transformed; ``` ### 2. Parallel Processing with Split in Batches ``` [Kafka Trigger] → [Split in Batches] → [HTTP Request] → [Aggregate] ↓ ↓ ↓ 1000 events 100 at a time Parallel API calls ↓ Combine results ``` ### 3. Use Compression **Kafka Producer**: ``` Compression: lz4 // Or gzip, snappy Batch Size: 1000 // Larger batches = better compression ``` ## Integration Patterns ### Pattern 1: Kafka + HTTP API Enrichment ``` [Kafka Trigger] → [HTTP Request] → [Transform] → [Kafka Producer] ↓ ↓ ↓ Raw events Enrich from API Combine data ↓ Publish enriched ``` ### Pattern 2: Kafka + Database Sync ``` [Kafka Trigger] → [PostgreSQL Upsert] → [Kafka Producer] ↓ ↓ ↓ CDC events Update database Publish success/failure ``` ### Pattern 3: Kafka + Email Notifications ``` [Kafka Trigger] → [If Critical] → [Send Email] → [Kafka Producer] ↓ ↓ ↓ Alerts severity=critical Notify admin ↓ Publish alert sent ``` ### Pattern 4: Kafka + Slack Alerts ``` [Kafka Trigger] → [Transform] → [Slack] → [Kafka Producer] ↓ ↓ ↓ Errors Format message Send to #alerts ↓ Publish notification ``` ## Testing n8n Workflows ### Manual Testing 1. **Test with Sample Data**: - Right-click node → "Add Sample Data" - Execute workflow - Check outputs 2. **Test Kafka Producer**: ```bash # Consume test topic kcat -C -b localhost:9092 -t test-output -o beginning ``` 3. **Test Kafka Trigger**: ```bash # Produce test message echo '{"test": "data"}' | kcat -P -b localhost:9092 -t test-input ``` ### Automated Testing **n8n CLI**: ```bash # Execute workflow with input n8n execute workflow --file workflow.json --input data.json # Export workflow n8n export:workflow --id=123 --output=workflow.json ``` ## Common Issues & Solutions ### Issue 1: Consumer Lag Building Up **Symptoms**: Processing slower than message arrival **Solutions**: 1. Increase consumer group size (parallel processing) 2. Enable batching (process 100 messages at once) 3. Optimize HTTP requests (use connection pooling) 4. Use Split in Batches for parallel processing ### Issue 2: Duplicate Messages **Cause**: At-least-once delivery, no deduplication **Solution**: Add idempotency check: ```javascript // Check if message already processed const messageId = $json.headers?.['message-id']; const exists = await $('Redis').exists(messageId); if (exists) { return { skip: true }; } ``` ### Issue 3: Workflow Execution Timeout **Cause**: Long-running HTTP requests **Solution**: Use async patterns: ``` [Kafka Trigger] → [Webhook] → [Wait for Webhook] → [Process Response] ↓ ↓ Trigger job Async callback ↓ Continue workflow ``` ## References - n8n Kafka Trigger: https://docs.n8n.io/integrations/builtin/trigger-nodes/n8n-nodes-base.kafkatrigger/ - n8n Kafka Producer: https://docs.n8n.io/integrations/builtin/app-nodes/n8n-nodes-base.kafka/ - n8n Best Practices: https://docs.n8n.io/hosting/scaling/best-practices/ - Workflow Examples: https://n8n.io/workflows --- **Invoke me when you need n8n Kafka integration, workflow automation, or event-driven no-code patterns!**