Files
gh-anton-abyzov-specweave-p…/commands/workflow-template.md
2025-11-29 17:56:59 +08:00

7.0 KiB

name, description
name description
specweave-n8n:workflow-template Generate n8n workflow JSON template with Kafka trigger/producer nodes. Creates event-driven workflow patterns (fan-out, retry+DLQ, enrichment, CDC).

Generate n8n Workflow Template

Create ready-to-use n8n workflow JSON files with Kafka integration patterns.

What This Command Does

  1. Select Pattern: Choose from common event-driven patterns
  2. Configure Kafka: Specify topics, consumer groups, brokers
  3. Customize Workflow: Add enrichment, transformations, error handling
  4. Generate JSON: Export n8n-compatible workflow file
  5. Import Instructions: How to load into n8n UI

Available Patterns

Pattern 1: Kafka Trigger → HTTP Enrichment → Kafka Producer

Use Case: Event enrichment with external API

Workflow:

[Kafka Trigger] → [HTTP Request] → [Set/Transform] → [Kafka Producer]
     ↓                 ↓                  ↓
  Input topic      Enrich data        Output topic

Configuration:

  • Input topic (e.g., orders)
  • API endpoint (e.g., https://api.example.com/customers/{id})
  • Output topic (e.g., enriched-orders)

Pattern 2: Kafka Trigger → Fan-Out

Use Case: Single event triggers multiple downstream topics

Workflow:

[Kafka Trigger] → [Switch] → [Kafka Producer] (high-priority)
     ↓                ↓
  Input          └─→ [Kafka Producer] (all-events)
                      └─→ [Kafka Producer] (analytics)

Pattern 3: Retry with Dead Letter Queue

Use Case: Fault-tolerant processing with retry logic

Workflow:

[Kafka Trigger] → [Try] → [Process] → [Kafka Producer] (success)
     ↓              ↓
  Input          [Catch] → [Increment Retry Count]
                               ↓
                         retry < 3 ?
                               ↓
                     [Kafka Producer] (retry-topic)
                               ↓
                     [Kafka Producer] (dlq-topic)

Pattern 4: Change Data Capture (CDC)

Use Case: Database polling → Kafka events

Workflow:

[Cron: Every 1m] → [PostgreSQL Query] → [Compare] → [Kafka Producer]
                         ↓                    ↓
                    Get new rows        Detect changes
                                             ↓
                                    Publish CDC events

Example Usage

# Generate workflow template
/specweave-n8n:workflow-template

# I'll ask:
# 1. Which pattern? (Enrichment, Fan-Out, Retry+DLQ, CDC)
# 2. Input topic name?
# 3. Output topic(s)?
# 4. Kafka broker (default: localhost:9092)?
# 5. Consumer group name?

# Then I'll generate:
# - workflow.json (importable into n8n)
# - README.md with setup instructions
# - .env.example with required variables

Generated Files

1. workflow.json: n8n workflow definition

{
  "name": "Kafka Event Enrichment",
  "nodes": [
    {
      "type": "n8n-nodes-base.kafkaTrigger",
      "name": "Kafka Trigger",
      "parameters": {
        "topic": "orders",
        "groupId": "order-processor",
        "brokers": "localhost:9092"
      }
    },
    {
      "type": "n8n-nodes-base.httpRequest",
      "name": "Enrich Customer Data",
      "parameters": {
        "url": "https://api.example.com/customers/={{$json.customerId}}",
        "authentication": "genericCredentialType"
      }
    },
    {
      "type": "n8n-nodes-base.set",
      "name": "Transform",
      "parameters": {
        "values": {
          "orderId": "={{$json.order.id}}",
          "customerName": "={{$json.customer.name}}"
        }
      }
    },
    {
      "type": "n8n-nodes-base.kafka",
      "name": "Kafka Producer",
      "parameters": {
        "topic": "enriched-orders",
        "brokers": "localhost:9092"
      }
    }
  ],
  "connections": { ... }
}

2. README.md: Import instructions

# Import Workflow into n8n

1. Open n8n UI (http://localhost:5678)
2. Click "Workflows" → "Import from File"
3. Select workflow.json
4. Configure credentials (Kafka, HTTP API)
5. Activate workflow
6. Test with sample event

3. .env.example: Required environment variables

KAFKA_BROKERS=localhost:9092
KAFKA_SASL_USERNAME=your-username
KAFKA_SASL_PASSWORD=your-password
API_ENDPOINT=https://api.example.com
API_TOKEN=your-api-token

Import into n8n

Via UI:

  1. n8n Dashboard → Workflows → Import from File
  2. Select generated workflow.json
  3. Configure Kafka credentials
  4. Activate workflow

Via CLI:

# Import workflow
n8n import:workflow --input=workflow.json

# List workflows
n8n list:workflow

Configuration Options

Kafka Settings

  • Brokers: Comma-separated list (e.g., broker1:9092,broker2:9092)
  • Consumer Group: Unique identifier for this workflow
  • Offset: earliest (replay) or latest (new messages only)
  • Auto Commit: true (recommended) or false (manual)
  • SSL/SASL: Authentication credentials for secure clusters

Error Handling

  • Retry Count: Maximum retries before DLQ (default: 3)
  • Backoff Strategy: Exponential (1s, 2s, 4s, 8s)
  • DLQ Topic: Dead letter queue for failed messages
  • Alert on Failure: Send Slack/email notification

Performance

  • Batch Size: Process N messages at once (default: 1)
  • Batch Timeout: Wait up to N ms for batch (default: 5000)
  • Parallel Execution: Enable for HTTP enrichment (default: disabled)
  • Max Memory: Limit workflow memory usage

Testing

Manual Test:

# 1. Produce test event
echo '{"orderId": 123, "customerId": 456}' | \
  kcat -P -b localhost:9092 -t orders

# 2. Check n8n execution log
# n8n UI → Executions → View latest run

# 3. Consume output
kcat -C -b localhost:9092 -t enriched-orders

Automated Test:

# Execute workflow via CLI
n8n execute workflow --file workflow.json \
  --input test-data.json

# Expected output: success status

Troubleshooting

Issue 1: Workflow Not Triggering

Solution: Check Kafka connection

# Test Kafka connectivity
kcat -L -b localhost:9092

# Verify consumer group registered
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-processor

Issue 2: Messages Not Being Consumed

Solution: Check offset position

  • n8n UI → Workflow → Kafka Trigger → Offset
  • Set to earliest to replay all messages

Issue 3: HTTP Enrichment Timeout

Solution: Enable parallel processing

  • Workflow → HTTP Request → Batching → Enable
  • Set batch size: 100
  • Set timeout: 30s
  • /specweave-kafka:dev-env - Set up local Kafka cluster
  • /specweave-n8n:test-workflow - Test workflow with sample data (coming soon)

Documentation


Plugin: specweave-n8n Version: 1.0.0 Status: Production Ready