Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 17:56:46 +08:00
commit 96a7ab295d
16 changed files with 4441 additions and 0 deletions

View File

@@ -0,0 +1,647 @@
---
name: kafka-architecture
description: Expert knowledge of Apache Kafka architecture, cluster design, capacity planning, partitioning strategies, replication, and high availability. Auto-activates on keywords kafka architecture, cluster sizing, partition strategy, replication factor, kafka ha, kafka scalability, broker count, topic design, kafka performance, kafka capacity planning.
---
# Kafka Architecture & Design Expert
Comprehensive knowledge of Apache Kafka architecture patterns, cluster design principles, and production best practices for building resilient, scalable event streaming platforms.
## Core Architecture Concepts
### Kafka Cluster Components
**Brokers**:
- Individual Kafka servers that store and serve data
- Each broker handles thousands of partitions
- Typical: 3-10 brokers per cluster (small), 10-100+ (large enterprises)
**Controller**:
- One broker elected as controller (via KRaft or ZooKeeper)
- Manages partition leaders and replica assignments
- Failure triggers automatic re-election
**Topics**:
- Logical channels for message streams
- Divided into partitions for parallelism
- Can have different retention policies per topic
**Partitions**:
- Ordered, immutable sequence of records
- Unit of parallelism (1 partition = 1 consumer in a group)
- Distributed across brokers for load balancing
**Replicas**:
- Copies of partitions across multiple brokers
- 1 leader replica (serves reads/writes)
- N-1 follower replicas (replication only)
- In-Sync Replicas (ISR): Followers caught up with leader
### KRaft vs ZooKeeper Mode
**KRaft Mode** (Recommended, Kafka 3.3+):
```yaml
Cluster Metadata:
- Stored in Kafka itself (no external ZooKeeper)
- Metadata topic: __cluster_metadata
- Controller quorum (3 or 5 nodes)
- Faster failover (<1s vs 10-30s)
- Simplified operations
```
**ZooKeeper Mode** (Legacy, deprecated in 4.0):
```yaml
External Coordination:
- Requires separate ZooKeeper ensemble (3-5 nodes)
- Stores cluster metadata, configs, ACLs
- Slower failover (10-30 seconds)
- More complex to operate
```
**Migration**: ZooKeeper → KRaft migration supported in Kafka 3.6+
## Cluster Sizing Guidelines
### Small Cluster (Development/Testing)
```yaml
Configuration:
Brokers: 3
Partitions per broker: ~100-500
Total partitions: 300-1500
Replication factor: 3
Hardware:
- CPU: 4-8 cores
- RAM: 8-16 GB
- Disk: 500 GB - 1 TB SSD
- Network: 1 Gbps
Use Cases:
- Development environments
- Low-volume production (<10 MB/s)
- Proof of concepts
- Single datacenter
Example Workload:
- 50 topics
- 5-10 partitions per topic
- 1 million messages/day
- 7-day retention
```
### Medium Cluster (Standard Production)
```yaml
Configuration:
Brokers: 6-12
Partitions per broker: 500-2000
Total partitions: 3K-24K
Replication factor: 3
Hardware:
- CPU: 16-32 cores
- RAM: 64-128 GB
- Disk: 2-8 TB NVMe SSD
- Network: 10 Gbps
Use Cases:
- Standard production workloads
- Multi-team environments
- Regional deployments
- Up to 500 MB/s throughput
Example Workload:
- 200-500 topics
- 10-50 partitions per topic
- 100 million messages/day
- 30-day retention
```
### Large Cluster (High-Scale Production)
```yaml
Configuration:
Brokers: 20-100+
Partitions per broker: 2000-4000
Total partitions: 40K-400K+
Replication factor: 3
Hardware:
- CPU: 32-64 cores
- RAM: 128-256 GB
- Disk: 8-20 TB NVMe SSD
- Network: 25-100 Gbps
Use Cases:
- Large enterprises
- Multi-region deployments
- Event-driven architectures
- 1+ GB/s throughput
Example Workload:
- 1000+ topics
- 50-200 partitions per topic
- 1+ billion messages/day
- 90-365 day retention
```
### Kafka Streams / Exactly-Once Semantics (EOS) Clusters
```yaml
Configuration:
Brokers: 6-12+ (same as standard, but more control plane load)
Partitions per broker: 500-1500 (fewer due to transaction overhead)
Total partitions: 3K-18K
Replication factor: 3
Hardware:
- CPU: 16-32 cores (more CPU for transactions)
- RAM: 64-128 GB
- Disk: 4-12 TB NVMe SSD (more for transaction logs)
- Network: 10-25 Gbps
Special Considerations:
- More brokers due to transaction coordinator load
- Lower partition count per broker (transactions = more overhead)
- Higher disk IOPS for transaction logs
- min.insync.replicas=2 mandatory for EOS
- acks=all required for producers
Use Cases:
- Stream processing with exactly-once guarantees
- Financial transactions
- Event sourcing with strict ordering
- Multi-step workflows requiring atomicity
```
## Partitioning Strategy
### How Many Partitions?
**Formula**:
```
Partitions = max(
Target Throughput / Single Partition Throughput,
Number of Consumers (for parallelism),
Future Growth Factor (2-3x)
)
Single Partition Limits:
- Write throughput: ~10-50 MB/s
- Read throughput: ~30-100 MB/s
- Message rate: ~10K-100K msg/s
```
**Examples**:
**High Throughput Topic** (Logs, Events):
```yaml
Requirements:
- Write: 200 MB/s
- Read: 500 MB/s (multiple consumers)
- Expected growth: 3x in 1 year
Calculation:
Write partitions: 200 MB/s ÷ 20 MB/s = 10
Read partitions: 500 MB/s ÷ 40 MB/s = 13
Growth factor: 13 × 3 = 39
Recommendation: 40-50 partitions
```
**Low-Latency Topic** (Commands, Requests):
```yaml
Requirements:
- Write: 5 MB/s
- Read: 10 MB/s
- Latency: <10ms p99
- Order preservation: By user ID
Calculation:
Throughput partitions: 5 MB/s ÷ 20 MB/s = 1
Parallelism: 4 (for redundancy)
Recommendation: 4-6 partitions (keyed by user ID)
```
**Dead Letter Queue**:
```yaml
Recommendation: 1-3 partitions
Reason: Low volume, order less important
```
### Partition Key Selection
**Good Keys** (High Cardinality, Even Distribution):
```yaml
✅ User ID (UUIDs):
- Millions of unique values
- Even distribution
- Example: "user-123e4567-e89b-12d3-a456-426614174000"
✅ Device ID (IoT):
- Unique per device
- Natural sharding
- Example: "device-sensor-001-zone-a"
✅ Order ID (E-commerce):
- Unique per transaction
- Even temporal distribution
- Example: "order-2024-11-15-abc123"
```
**Bad Keys** (Low Cardinality, Hotspots):
```yaml
❌ Country Code:
- Only ~200 values
- Uneven (US, CN >> others)
- Creates partition hotspots
❌ Boolean Flags:
- Only 2 values (true/false)
- Severe imbalance
❌ Date (YYYY-MM-DD):
- All today's traffic → 1 partition
- Temporal hotspot
```
**Compound Keys** (Best of Both):
```yaml
✅ Country + User ID:
- Partition by country for locality
- Sub-partition by user for distribution
- Example: "US:user-123" → hash("US:user-123")
✅ Tenant + Event Type + Timestamp:
- Multi-tenant isolation
- Event type grouping
- Temporal ordering
```
## Replication & High Availability
### Replication Factor Guidelines
```yaml
Development:
Replication Factor: 1
Reason: Fast, no durability needed
Production (Standard):
Replication Factor: 3
Reason: Balance durability vs cost
Tolerates: 2 broker failures (with min.insync.replicas=2)
Production (Critical):
Replication Factor: 5
Reason: Maximum durability
Tolerates: 4 broker failures (with min.insync.replicas=3)
Use Cases: Financial transactions, audit logs
Multi-Datacenter:
Replication Factor: 3 per DC (6 total)
Reason: DC-level fault tolerance
Requires: MirrorMaker 2 or Confluent Replicator
```
### min.insync.replicas
**Configuration**:
```yaml
min.insync.replicas=2:
- At least 2 replicas must acknowledge writes
- Typical for replication.factor=3
- Prevents data loss if 1 broker fails
min.insync.replicas=1:
- Only leader must acknowledge (dangerous!)
- Use only for non-critical topics
min.insync.replicas=3:
- At least 3 replicas must acknowledge
- For replication.factor=5 (critical systems)
```
**Rule**: `min.insync.replicas ≤ replication.factor - 1` (to allow 1 replica failure)
### Rack Awareness
```yaml
Configuration:
broker.rack=rack1 # Broker 1
broker.rack=rack2 # Broker 2
broker.rack=rack3 # Broker 3
Benefit:
- Replicas spread across racks
- Survives rack-level failures (power, network)
- Example: Topic with RF=3 → 1 replica per rack
Placement:
Leader: rack1
Follower 1: rack2
Follower 2: rack3
```
## Retention Strategies
### Time-Based Retention
```yaml
Short-Term (Events, Logs):
retention.ms: 86400000 # 1 day
Use Cases: Real-time analytics, monitoring
Medium-Term (Transactions):
retention.ms: 604800000 # 7 days
Use Cases: Standard business events
Long-Term (Audit, Compliance):
retention.ms: 31536000000 # 365 days
Use Cases: Regulatory requirements, event sourcing
Infinite (Event Sourcing):
retention.ms: -1 # Forever
cleanup.policy: compact
Use Cases: Source of truth, state rebuilding
```
### Size-Based Retention
```yaml
retention.bytes: 10737418240 # 10 GB per partition
Combined (Time OR Size):
retention.ms: 604800000 # 7 days
retention.bytes: 107374182400 # 100 GB
# Whichever limit is reached first
```
### Compaction (Log Compaction)
```yaml
cleanup.policy: compact
How It Works:
- Keeps only latest value per key
- Deletes old versions
- Preserves full history initially, compacts later
Use Cases:
- Database changelogs (CDC)
- User profile updates
- Configuration management
- State stores
Example:
Before Compaction:
user:123 → {name: "Alice", v:1}
user:123 → {name: "Alice", v:2, email: "alice@ex.com"}
user:123 → {name: "Alice A.", v:3}
After Compaction:
user:123 → {name: "Alice A.", v:3} # Latest only
```
## Performance Optimization
### Broker Configuration
```yaml
# Network threads (handle client connections)
num.network.threads: 8 # Increase for high connection count
# I/O threads (disk operations)
num.io.threads: 16 # Set to number of disks × 2
# Replica fetcher threads
num.replica.fetchers: 4 # Increase for many partitions
# Socket buffer sizes
socket.send.buffer.bytes: 1048576 # 1 MB
socket.receive.buffer.bytes: 1048576 # 1 MB
# Log flush (default: OS handles flushing)
log.flush.interval.messages: 10000 # Flush every 10K messages
log.flush.interval.ms: 1000 # Or every 1 second
```
### Producer Optimization
```yaml
High Throughput:
batch.size: 65536 # 64 KB
linger.ms: 100 # Wait 100ms for batching
compression.type: lz4 # Fast compression
acks: 1 # Leader only
Low Latency:
batch.size: 16384 # 16 KB (default)
linger.ms: 0 # Send immediately
compression.type: none
acks: 1
Durability (Exactly-Once):
batch.size: 16384
linger.ms: 10
compression.type: lz4
acks: all
enable.idempotence: true
transactional.id: "producer-1"
```
### Consumer Optimization
```yaml
High Throughput:
fetch.min.bytes: 1048576 # 1 MB
fetch.max.wait.ms: 500 # Wait 500ms to accumulate
Low Latency:
fetch.min.bytes: 1 # Immediate fetch
fetch.max.wait.ms: 100 # Short wait
Max Parallelism:
# Deploy consumers = number of partitions
# More consumers than partitions = idle consumers
```
## Multi-Datacenter Patterns
### Active-Passive (Disaster Recovery)
```yaml
Architecture:
Primary DC: Full Kafka cluster
Secondary DC: Replica cluster (MirrorMaker 2)
Configuration:
- Producers → Primary only
- Consumers → Primary only
- MirrorMaker 2: Primary → Secondary (async replication)
Failover:
1. Detect primary failure
2. Switch producers/consumers to secondary
3. Promote secondary to primary
Recovery Time: 5-30 minutes (manual)
Data Loss: Potential (async replication lag)
```
### Active-Active (Geo-Replication)
```yaml
Architecture:
DC1: Kafka cluster (region A)
DC2: Kafka cluster (region B)
Bidirectional replication via MirrorMaker 2
Configuration:
- Producers → Nearest DC
- Consumers → Nearest DC or both
- Conflict resolution: Last-write-wins or custom
Challenges:
- Duplicate messages (at-least-once delivery)
- Ordering across DCs not guaranteed
- Circular replication prevention
Use Cases:
- Global applications
- Regional compliance (GDPR)
- Load distribution
```
### Stretch Cluster (Synchronous Replication)
```yaml
Architecture:
Single Kafka cluster spanning 2 DCs
Rack awareness: DC1 = rack1, DC2 = rack2
Configuration:
min.insync.replicas: 2
replication.factor: 4 (2 per DC)
acks: all
Requirements:
- Low latency between DCs (<10ms)
- High bandwidth link (10+ Gbps)
- Dedicated fiber
Trade-offs:
Pros: Synchronous replication, zero data loss
Cons: Latency penalty, network dependency
```
## Monitoring & Observability
### Key Metrics
**Broker Metrics**:
```yaml
UnderReplicatedPartitions:
Alert: > 0 for > 5 minutes
Indicates: Replica lag, broker failure
OfflinePartitionsCount:
Alert: > 0
Indicates: No leader elected (critical!)
ActiveControllerCount:
Alert: != 1 (should be exactly 1)
Indicates: Split brain or no controller
RequestHandlerAvgIdlePercent:
Alert: < 20%
Indicates: Broker CPU saturation
```
**Topic Metrics**:
```yaml
MessagesInPerSec:
Monitor: Throughput trends
Alert: Sudden drops (producer failure)
BytesInPerSec / BytesOutPerSec:
Monitor: Network utilization
Alert: Approaching NIC limits
RecordsLagMax (Consumer):
Alert: > 10000 or growing
Indicates: Consumer can't keep up
```
**Disk Metrics**:
```yaml
LogSegmentSize:
Monitor: Disk usage trends
Alert: > 80% capacity
LogFlushRateAndTimeMs:
Monitor: Disk write latency
Alert: > 100ms p99 (slow disk)
```
## Security Patterns
### Authentication & Authorization
```yaml
SASL/SCRAM-SHA-512:
- Industry standard
- User/password authentication
- Stored in ZooKeeper/KRaft
ACLs (Access Control Lists):
- Per-topic, per-group permissions
- Operations: READ, WRITE, CREATE, DELETE, ALTER
- Example:
bin/kafka-acls.sh --add \
--allow-principal User:alice \
--operation READ \
--topic orders
mTLS (Mutual TLS):
- Certificate-based auth
- Strong cryptographic identity
- Best for service-to-service
```
## Integration with SpecWeave
**Automatic Architecture Detection**:
```typescript
import { ClusterSizingCalculator } from './lib/utils/sizing';
const calculator = new ClusterSizingCalculator();
const recommendation = calculator.calculate({
throughputMBps: 200,
retentionDays: 30,
replicationFactor: 3,
topicCount: 100
});
console.log(recommendation);
// {
// brokers: 8,
// partitionsPerBroker: 1500,
// diskPerBroker: 6000 GB,
// ramPerBroker: 64 GB
// }
```
**SpecWeave Commands**:
- `/specweave-kafka:deploy` - Validates cluster sizing before deployment
- `/specweave-kafka:monitor-setup` - Configures metrics for key indicators
## Related Skills
- `/specweave-kafka:kafka-mcp-integration` - MCP server setup
- `/specweave-kafka:kafka-cli-tools` - CLI operations
## External Links
- [Kafka Documentation - Architecture](https://kafka.apache.org/documentation/#design)
- [Confluent - Kafka Sizing](https://www.confluent.io/blog/how-to-choose-the-number-of-topics-partitions-in-a-kafka-cluster/)
- [KRaft Mode Overview](https://kafka.apache.org/documentation/#kraft)
- [LinkedIn Engineering - Kafka at Scale](https://engineering.linkedin.com/kafka/running-kafka-scale)

View File

@@ -0,0 +1,433 @@
---
name: kafka-cli-tools
description: Expert knowledge of Kafka CLI tools (kcat, kcli, kaf, kafkactl). Auto-activates on keywords kcat, kafkacat, kcli, kaf, kafkactl, kafka cli, kafka command line, produce message, consume topic, list topics, kafka metadata. Provides command examples, installation guides, and tool comparisons.
---
# Kafka CLI Tools Expert
Comprehensive knowledge of modern Kafka CLI tools for production operations, development, and troubleshooting.
## Supported CLI Tools
### 1. kcat (kafkacat) - The Swiss Army Knife
**Installation**:
```bash
# macOS
brew install kcat
# Ubuntu/Debian
apt-get install kafkacat
# From source
git clone https://github.com/edenhill/kcat.git
cd kcat
./configure && make && sudo make install
```
**Core Operations**:
**Produce Messages**:
```bash
# Simple produce
echo "Hello Kafka" | kcat -P -b localhost:9092 -t my-topic
# Produce with key (key:value format)
echo "user123:Login event" | kcat -P -b localhost:9092 -t events -K:
# Produce from file
cat events.json | kcat -P -b localhost:9092 -t events
# Produce with headers
echo "msg" | kcat -P -b localhost:9092 -t my-topic -H "source=app1" -H "version=1.0"
# Produce with compression
echo "data" | kcat -P -b localhost:9092 -t my-topic -z gzip
# Produce with acks=all
echo "critical-data" | kcat -P -b localhost:9092 -t my-topic -X acks=all
```
**Consume Messages**:
```bash
# Consume from beginning
kcat -C -b localhost:9092 -t my-topic -o beginning
# Consume from end (latest)
kcat -C -b localhost:9092 -t my-topic -o end
# Consume specific partition
kcat -C -b localhost:9092 -t my-topic -p 0 -o beginning
# Consume with consumer group
kcat -C -b localhost:9092 -G my-group my-topic
# Consume N messages and exit
kcat -C -b localhost:9092 -t my-topic -c 10
# Custom format (topic:partition:offset:key:value)
kcat -C -b localhost:9092 -t my-topic -f 'Topic: %t, Partition: %p, Offset: %o, Key: %k, Value: %s\n'
# JSON output
kcat -C -b localhost:9092 -t my-topic -J
```
**Metadata & Admin**:
```bash
# List all topics
kcat -L -b localhost:9092
# Get topic metadata (JSON)
kcat -L -b localhost:9092 -t my-topic -J
# Query topic offsets
kcat -Q -b localhost:9092 -t my-topic
# Check broker health
kcat -L -b localhost:9092 | grep "broker\|topic"
```
**SASL/SSL Authentication**:
```bash
# SASL/PLAINTEXT
kcat -b localhost:9092 \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanism=PLAIN \
-X sasl.username=admin \
-X sasl.password=admin-secret \
-L
# SASL/SSL
kcat -b localhost:9093 \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=SCRAM-SHA-256 \
-X sasl.username=admin \
-X sasl.password=admin-secret \
-X ssl.ca.location=/path/to/ca-cert \
-L
# mTLS (mutual TLS)
kcat -b localhost:9093 \
-X security.protocol=SSL \
-X ssl.ca.location=/path/to/ca-cert \
-X ssl.certificate.location=/path/to/client-cert.pem \
-X ssl.key.location=/path/to/client-key.pem \
-L
```
### 2. kcli - Kubernetes-Native Kafka CLI
**Installation**:
```bash
# Install via krew (Kubernetes plugin manager)
kubectl krew install kcli
# Or download binary
curl -LO https://github.com/cswank/kcli/releases/latest/download/kcli-linux-amd64
chmod +x kcli-linux-amd64
sudo mv kcli-linux-amd64 /usr/local/bin/kcli
```
**Kubernetes Integration**:
```bash
# Connect to Kafka running in k8s
kcli --context my-cluster --namespace kafka
# Produce to topic in k8s
echo "msg" | kcli produce --topic my-topic --brokers kafka-broker:9092
# Consume from k8s Kafka
kcli consume --topic my-topic --brokers kafka-broker:9092 --from-beginning
# List topics in k8s cluster
kcli topics list --brokers kafka-broker:9092
```
**Best For**:
- Kubernetes-native deployments
- Helmfile/Kustomize workflows
- GitOps with ArgoCD/Flux
### 3. kaf - Modern Terminal UI
**Installation**:
```bash
# macOS
brew install kaf
# Linux (via snap)
snap install kaf
# From source
go install github.com/birdayz/kaf/cmd/kaf@latest
```
**Interactive Features**:
```bash
# Configure cluster
kaf config add-cluster local --brokers localhost:9092
# Use cluster
kaf config use-cluster local
# Interactive topic browsing (TUI)
kaf topics
# Interactive consume (arrow keys to navigate)
kaf consume my-topic
# Produce interactively
kaf produce my-topic
# Consumer group management
kaf groups
kaf group describe my-group
kaf group reset my-group --topic my-topic --offset earliest
# Schema Registry integration
kaf schemas
kaf schema get my-schema
```
**Best For**:
- Development workflows
- Quick topic exploration
- Consumer group debugging
- Schema Registry management
### 4. kafkactl - Advanced Admin Tool
**Installation**:
```bash
# macOS
brew install deviceinsight/packages/kafkactl
# Linux
curl -L https://github.com/deviceinsight/kafkactl/releases/latest/download/kafkactl_linux_amd64 -o kafkactl
chmod +x kafkactl
sudo mv kafkactl /usr/local/bin/
# Via Docker
docker run --rm -it deviceinsight/kafkactl:latest
```
**Advanced Operations**:
```bash
# Configure context
kafkactl config add-context local --brokers localhost:9092
# Topic management
kafkactl create topic my-topic --partitions 3 --replication-factor 2
kafkactl alter topic my-topic --config retention.ms=86400000
kafkactl delete topic my-topic
# Consumer group operations
kafkactl describe consumer-group my-group
kafkactl reset consumer-group my-group --topic my-topic --offset earliest
kafkactl delete consumer-group my-group
# ACL management
kafkactl create acl --allow --principal User:alice --operation READ --topic my-topic
kafkactl list acls
# Quota management
kafkactl alter client-quota --user alice --producer-byte-rate 1048576
# Reassign partitions
kafkactl alter partition --topic my-topic --partition 0 --replicas 1,2,3
```
**Best For**:
- Production cluster management
- ACL administration
- Partition reassignment
- Quota management
## Tool Comparison Matrix
| Feature | kcat | kcli | kaf | kafkactl |
|---------|------|------|-----|----------|
| **Installation** | Easy | Medium | Easy | Easy |
| **Produce** | ✅ Advanced | ✅ Basic | ✅ Interactive | ✅ Basic |
| **Consume** | ✅ Advanced | ✅ Basic | ✅ Interactive | ✅ Basic |
| **Metadata** | ✅ JSON | ✅ Basic | ✅ TUI | ✅ Detailed |
| **TUI** | ❌ | ❌ | ✅ | ✅ Limited |
| **Admin** | ❌ | ❌ | ⚠️ Limited | ✅ Advanced |
| **SASL/SSL** | ✅ | ✅ | ✅ | ✅ |
| **K8s Native** | ❌ | ✅ | ❌ | ❌ |
| **Schema Reg** | ❌ | ❌ | ✅ | ❌ |
| **ACLs** | ❌ | ❌ | ❌ | ✅ |
| **Quotas** | ❌ | ❌ | ❌ | ✅ |
| **Best For** | Scripting, ops | Kubernetes | Development | Production admin |
## Common Patterns
### 1. Topic Creation with Optimal Settings
```bash
# Using kafkactl (recommended for production)
kafkactl create topic orders \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config compression.type=lz4 \
--config min.insync.replicas=2
# Verify with kcat
kcat -L -b localhost:9092 -t orders -J | jq '.topics[0]'
```
### 2. Dead Letter Queue Pattern
```bash
# Produce failed message to DLQ
echo "failed-msg" | kcat -P -b localhost:9092 -t orders-dlq \
-H "original-topic=orders" \
-H "error=DeserializationException" \
-H "timestamp=$(date -u +%Y-%m-%dT%H:%M:%SZ)"
# Monitor DLQ
kcat -C -b localhost:9092 -t orders-dlq -f 'Headers: %h\nValue: %s\n\n'
```
### 3. Consumer Group Lag Monitoring
```bash
# Using kafkactl
kafkactl describe consumer-group my-app | grep LAG
# Using kcat (via external tool like kcat-lag)
kcat -L -b localhost:9092 -J | jq '.topics[].partitions[] | select(.topic=="my-topic")'
# Using kaf (interactive)
kaf groups
# Then select group to see lag in TUI
```
### 4. Multi-Cluster Replication Testing
```bash
# Produce to source cluster
echo "test" | kcat -P -b source-kafka:9092 -t replicated-topic
# Consume from target cluster
kcat -C -b target-kafka:9092 -t replicated-topic -o end -c 1
# Compare offsets
kcat -Q -b source-kafka:9092 -t replicated-topic
kcat -Q -b target-kafka:9092 -t replicated-topic
```
### 5. Performance Testing
```bash
# Produce 10,000 messages with kcat
seq 1 10000 | kcat -P -b localhost:9092 -t perf-test
# Consume and measure throughput
time kcat -C -b localhost:9092 -t perf-test -c 10000 -o beginning > /dev/null
# Test with compression
seq 1 10000 | kcat -P -b localhost:9092 -t perf-test -z lz4
```
## Troubleshooting
### Connection Issues
```bash
# Test broker connectivity
kcat -L -b localhost:9092
# Check SSL/TLS connection
openssl s_client -connect localhost:9093 -showcerts
# Verify SASL authentication
kcat -b localhost:9092 \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanism=PLAIN \
-X sasl.username=admin \
-X sasl.password=wrong-password \
-L
# Should fail with authentication error
```
### Message Not Appearing
```bash
# Check topic exists
kcat -L -b localhost:9092 | grep my-topic
# Check partition count
kcat -L -b localhost:9092 -t my-topic -J | jq '.topics[0].partition_count'
# Query all partition offsets
kcat -Q -b localhost:9092 -t my-topic
# Consume from all partitions
for i in {0..11}; do
echo "Partition $i:"
kcat -C -b localhost:9092 -t my-topic -p $i -c 1 -o end
done
```
### Consumer Group Stuck
```bash
# Check consumer group state
kafkactl describe consumer-group my-app
# Reset to beginning
kafkactl reset consumer-group my-app --topic my-topic --offset earliest
# Reset to specific offset
kafkactl reset consumer-group my-app --topic my-topic --partition 0 --offset 12345
# Delete consumer group (all consumers must be stopped first)
kafkactl delete consumer-group my-app
```
## Integration with SpecWeave
**Automatic CLI Tool Detection**:
SpecWeave auto-detects installed CLI tools and recommends best tool for the operation:
```typescript
import { CLIToolDetector } from './lib/cli/detector';
const detector = new CLIToolDetector();
const available = await detector.detectAll();
// Recommended tool for produce operation
if (available.includes('kcat')) {
console.log('Use kcat for produce (fastest)');
} else if (available.includes('kaf')) {
console.log('Use kaf for produce (interactive)');
}
```
**SpecWeave Commands**:
- `/specweave-kafka:dev-env` - Uses Docker Compose + kcat for local testing
- `/specweave-kafka:monitor-setup` - Sets up kcat-based lag monitoring
- `/specweave-kafka:mcp-configure` - Validates CLI tools are installed
## Security Best Practices
1. **Never hardcode credentials** - Use environment variables or secrets management
2. **Use SSL/TLS in production** - Configure `-X security.protocol=SASL_SSL`
3. **Prefer SCRAM over PLAIN** - Use `-X sasl.mechanism=SCRAM-SHA-256`
4. **Rotate credentials regularly** - Update passwords and certificates
5. **Least privilege** - Grant only necessary ACLs to users
## Related Skills
- `/specweave-kafka:kafka-mcp-integration` - MCP server setup and configuration
- `/specweave-kafka:kafka-architecture` - Cluster design and sizing
## External Links
- [kcat GitHub](https://github.com/edenhill/kcat)
- [kcli GitHub](https://github.com/cswank/kcli)
- [kaf GitHub](https://github.com/birdayz/kaf)
- [kafkactl GitHub](https://github.com/deviceinsight/kafkactl)
- [Apache Kafka Documentation](https://kafka.apache.org/documentation/)

View File

@@ -0,0 +1,449 @@
---
name: kafka-iac-deployment
description: Infrastructure as Code (IaC) deployment expert for Apache Kafka. Guides Terraform deployments across Apache Kafka (KRaft mode), AWS MSK, Azure Event Hubs. Activates for terraform, iac, infrastructure as code, deploy kafka, provision kafka, aws msk, azure event hubs, kafka infrastructure, terraform modules, cloud deployment, kafka deployment automation.
---
# Kafka Infrastructure as Code (IaC) Deployment
Expert guidance for deploying Apache Kafka using Terraform across multiple platforms.
## When to Use This Skill
I activate when you need help with:
- **Terraform deployments**: "Deploy Kafka with Terraform", "provision Kafka cluster"
- **Platform selection**: "Should I use AWS MSK or self-hosted Kafka?", "compare Kafka platforms"
- **Infrastructure planning**: "How to size Kafka infrastructure", "Kafka on AWS vs Azure"
- **IaC automation**: "Automate Kafka deployment", "CI/CD for Kafka infrastructure"
## What I Know
### Available Terraform Modules
This plugin provides 3 production-ready Terraform modules:
#### 1. **Apache Kafka (Self-Hosted, KRaft Mode)**
- **Location**: `plugins/specweave-kafka/terraform/apache-kafka/`
- **Platform**: AWS EC2 (can adapt to other clouds)
- **Architecture**: KRaft mode (no ZooKeeper dependency)
- **Features**:
- Multi-broker cluster (3-5 brokers recommended)
- Security groups with SASL_SSL
- IAM roles for S3 backups
- CloudWatch metrics and alarms
- Auto-scaling group support
- Custom VPC and subnet configuration
- **Use When**:
- ✅ You need full control over Kafka configuration
- ✅ Running Kafka 3.6+ (KRaft mode)
- ✅ Want to avoid ZooKeeper operational overhead
- ✅ Multi-cloud or hybrid deployments
- **Variables**:
```hcl
module "kafka" {
source = "../../plugins/specweave-kafka/terraform/apache-kafka"
environment = "production"
broker_count = 3
kafka_version = "3.7.0"
instance_type = "m5.xlarge"
vpc_id = var.vpc_id
subnet_ids = var.subnet_ids
domain = "example.com"
enable_s3_backups = true
enable_monitoring = true
}
```
#### 2. **AWS MSK (Managed Streaming for Kafka)**
- **Location**: `plugins/specweave-kafka/terraform/aws-msk/`
- **Platform**: AWS Managed Service
- **Features**:
- Fully managed Kafka service
- IAM authentication + SASL/SCRAM
- Auto-scaling (provisioned throughput)
- Built-in monitoring (CloudWatch)
- Multi-AZ deployment
- Encryption in transit and at rest
- **Use When**:
- ✅ You want AWS to manage Kafka operations
- ✅ Need tight AWS integration (IAM, KMS, CloudWatch)
- ✅ Prefer operational simplicity over cost
- ✅ Running in AWS VPC
- **Variables**:
```hcl
module "msk" {
source = "../../plugins/specweave-kafka/terraform/aws-msk"
cluster_name = "my-kafka-cluster"
kafka_version = "3.6.0"
number_of_broker_nodes = 3
broker_node_instance_type = "kafka.m5.large"
vpc_id = var.vpc_id
subnet_ids = var.private_subnet_ids
enable_iam_auth = true
enable_scram_auth = false
enable_auto_scaling = true
}
```
#### 3. **Azure Event Hubs (Kafka API)**
- **Location**: `plugins/specweave-kafka/terraform/azure-event-hubs/`
- **Platform**: Azure Managed Service
- **Features**:
- Kafka 1.0+ protocol support
- Auto-inflate (elastic scaling)
- Premium SKU for high throughput
- Zone redundancy
- Private endpoints (VNet integration)
- Event capture to Azure Storage
- **Use When**:
- ✅ Running on Azure cloud
- ✅ Need Kafka-compatible API without Kafka operations
- ✅ Want serverless scaling (auto-inflate)
- ✅ Integrating with Azure ecosystem
- **Variables**:
```hcl
module "event_hubs" {
source = "../../plugins/specweave-kafka/terraform/azure-event-hubs"
namespace_name = "my-event-hub-ns"
resource_group_name = var.resource_group_name
location = "eastus"
sku = "Premium"
capacity = 1
kafka_enabled = true
auto_inflate_enabled = true
maximum_throughput_units = 20
}
```
## Platform Selection Decision Tree
```
Need Kafka deployment? START HERE:
├─ Running on AWS?
│ ├─ YES → Want managed service?
│ │ ├─ YES → Use AWS MSK module (terraform/aws-msk)
│ │ └─ NO → Use Apache Kafka module (terraform/apache-kafka)
│ └─ NO → Continue...
├─ Running on Azure?
│ ├─ YES → Use Azure Event Hubs module (terraform/azure-event-hubs)
│ └─ NO → Continue...
├─ Multi-cloud or hybrid?
│ └─ YES → Use Apache Kafka module (most portable)
├─ Need maximum control?
│ └─ YES → Use Apache Kafka module
└─ Default → Use Apache Kafka module (self-hosted, KRaft mode)
```
## Deployment Workflows
### Workflow 1: Deploy Self-Hosted Kafka (Apache Kafka Module)
**Scenario**: You want full control over Kafka on AWS EC2
```bash
# 1. Create Terraform configuration
cat > main.tf <<EOF
module "kafka_cluster" {
source = "../../plugins/specweave-kafka/terraform/apache-kafka"
environment = "production"
broker_count = 3
kafka_version = "3.7.0"
instance_type = "m5.xlarge"
vpc_id = "vpc-12345678"
subnet_ids = ["subnet-abc", "subnet-def", "subnet-ghi"]
domain = "kafka.example.com"
enable_s3_backups = true
enable_monitoring = true
tags = {
Project = "MyApp"
Environment = "Production"
}
}
output "broker_endpoints" {
value = module.kafka_cluster.broker_endpoints
}
EOF
# 2. Initialize Terraform
terraform init
# 3. Plan deployment (review what will be created)
terraform plan
# 4. Apply (create infrastructure)
terraform apply
# 5. Get broker endpoints
terraform output broker_endpoints
# Output: ["kafka-0.kafka.example.com:9093", "kafka-1.kafka.example.com:9093", ...]
```
### Workflow 2: Deploy AWS MSK (Managed Service)
**Scenario**: You want AWS to manage Kafka operations
```bash
# 1. Create Terraform configuration
cat > main.tf <<EOF
module "msk_cluster" {
source = "../../plugins/specweave-kafka/terraform/aws-msk"
cluster_name = "my-msk-cluster"
kafka_version = "3.6.0"
number_of_broker_nodes = 3
broker_node_instance_type = "kafka.m5.large"
vpc_id = var.vpc_id
subnet_ids = var.private_subnet_ids
enable_iam_auth = true
enable_auto_scaling = true
tags = {
Project = "MyApp"
}
}
output "bootstrap_brokers" {
value = module.msk_cluster.bootstrap_brokers_sasl_iam
}
EOF
# 2. Deploy
terraform init && terraform apply
# 3. Configure IAM authentication
# (module outputs IAM policy, attach to your application role)
```
### Workflow 3: Deploy Azure Event Hubs (Kafka API)
**Scenario**: You're on Azure and want Kafka-compatible API
```bash
# 1. Create Terraform configuration
cat > main.tf <<EOF
module "event_hubs" {
source = "../../plugins/specweave-kafka/terraform/azure-event-hubs"
namespace_name = "my-kafka-namespace"
resource_group_name = "my-resource-group"
location = "eastus"
sku = "Premium"
capacity = 1
kafka_enabled = true
auto_inflate_enabled = true
maximum_throughput_units = 20
# Create hubs (topics) for your use case
hubs = [
{ name = "user-events", partitions = 12 },
{ name = "order-events", partitions = 6 },
{ name = "payment-events", partitions = 3 }
]
}
output "connection_string" {
value = module.event_hubs.connection_string
sensitive = true
}
EOF
# 2. Deploy
terraform init && terraform apply
# 3. Get connection details
terraform output connection_string
```
## Infrastructure Sizing Recommendations
### Small Environment (Dev/Test)
```hcl
# Self-hosted: 1 broker, m5.large
broker_count = 1
instance_type = "m5.large"
# AWS MSK: 1 broker per AZ, kafka.m5.large
number_of_broker_nodes = 3
broker_node_instance_type = "kafka.m5.large"
# Azure Event Hubs: Basic SKU
sku = "Basic"
capacity = 1
```
### Medium Environment (Staging/Production)
```hcl
# Self-hosted: 3 brokers, m5.xlarge
broker_count = 3
instance_type = "m5.xlarge"
# AWS MSK: 3 brokers, kafka.m5.xlarge
number_of_broker_nodes = 3
broker_node_instance_type = "kafka.m5.xlarge"
# Azure Event Hubs: Standard SKU with auto-inflate
sku = "Standard"
capacity = 2
auto_inflate_enabled = true
maximum_throughput_units = 10
```
### Large Environment (High-Throughput Production)
```hcl
# Self-hosted: 5+ brokers, m5.2xlarge or m5.4xlarge
broker_count = 5
instance_type = "m5.2xlarge"
# AWS MSK: 6+ brokers, kafka.m5.2xlarge, auto-scaling
number_of_broker_nodes = 6
broker_node_instance_type = "kafka.m5.2xlarge"
enable_auto_scaling = true
# Azure Event Hubs: Premium SKU with zone redundancy
sku = "Premium"
capacity = 4
zone_redundant = true
maximum_throughput_units = 20
```
## Best Practices
### Security Best Practices
1. **Always use encryption in transit**
- Self-hosted: Enable SASL_SSL listener
- AWS MSK: Set `encryption_in_transit_client_broker = "TLS"`
- Azure Event Hubs: HTTPS/TLS enabled by default
2. **Use IAM authentication (when possible)**
- AWS MSK: `enable_iam_auth = true`
- Azure Event Hubs: Managed identities
3. **Network isolation**
- Deploy in private subnets
- Use security groups/NSGs restrictively
- Azure: Enable private endpoints for Premium SKU
### High Availability Best Practices
1. **Multi-AZ deployment**
- Self-hosted: Distribute brokers across 3+ AZs
- AWS MSK: Automatically multi-AZ
- Azure Event Hubs: Enable `zone_redundant = true` (Premium)
2. **Replication factor = 3**
- Self-hosted: `default.replication.factor=3`
- AWS MSK: Configured automatically
- Azure Event Hubs: N/A (fully managed)
3. **min.insync.replicas = 2**
- Ensures durability even if 1 broker fails
### Cost Optimization
1. **Right-size instances**
- Use ClusterSizingCalculator utility (in kafka-architecture skill)
- Start small, scale up based on metrics
2. **Auto-scaling (where available)**
- AWS MSK: `enable_auto_scaling = true`
- Azure Event Hubs: `auto_inflate_enabled = true`
3. **Retention policies**
- Set `log.retention.hours` based on actual needs (default: 168 hours = 7 days)
- Shorter retention = lower storage costs
## Monitoring Integration
All modules integrate with monitoring:
### Self-Hosted Kafka
- CloudWatch metrics (via JMX Exporter)
- Prometheus + Grafana dashboards (see kafka-observability skill)
- Custom CloudWatch alarms
### AWS MSK
- Built-in CloudWatch metrics
- Enhanced monitoring available
- Integration with CloudWatch Alarms
### Azure Event Hubs
- Built-in Azure Monitor metrics
- Diagnostic logs to Log Analytics
- Integration with Azure Alerts
## Troubleshooting
### "Terraform destroy fails on security groups"
**Cause**: Resources using security groups still exist
**Fix**:
```bash
# 1. Find dependent resources
aws ec2 describe-network-interfaces --filters "Name=group-id,Values=sg-12345678"
# 2. Delete dependent resources first
# 3. Retry terraform destroy
```
### "AWS MSK cluster takes 20+ minutes to create"
**Cause**: MSK provisioning is inherently slow (AWS behavior)
**Fix**: This is normal. Use `--auto-approve` for automation:
```bash
terraform apply -auto-approve
```
### "Azure Event Hubs: Connection refused"
**Cause**: Kafka protocol not enabled OR incorrect connection string
**Fix**:
1. Verify `kafka_enabled = true` in Terraform
2. Use Kafka connection string (not Event Hubs connection string)
3. Check firewall rules (Premium SKU supports private endpoints)
## Integration with Other Skills
- **kafka-architecture**: For cluster sizing and partitioning strategy
- **kafka-observability**: For Prometheus + Grafana setup after deployment
- **kafka-kubernetes**: For deploying Kafka on Kubernetes (alternative to Terraform)
- **kafka-cli-tools**: For testing deployed clusters with kcat
## Quick Reference Commands
```bash
# Terraform workflow
terraform init # Initialize modules
terraform plan # Preview changes
terraform apply # Create infrastructure
terraform output # Get outputs (endpoints, etc.)
terraform destroy # Delete infrastructure
# AWS MSK specific
aws kafka list-clusters # List MSK clusters
aws kafka describe-cluster --cluster-arn <arn> # Get cluster details
# Azure Event Hubs specific
az eventhubs namespace list # List namespaces
az eventhubs eventhub list --namespace-name <name> --resource-group <rg> # List hubs
```
---
**Next Steps After Deployment**:
1. Use **kafka-observability** skill to set up Prometheus + Grafana monitoring
2. Use **kafka-cli-tools** skill to test cluster with kcat
3. Deploy your producer/consumer applications
4. Monitor cluster health and performance

View File

@@ -0,0 +1,667 @@
---
name: kafka-kubernetes
description: Kubernetes deployment expert for Apache Kafka. Guides K8s deployments using Helm charts, operators (Strimzi, Confluent), StatefulSets, and production best practices. Activates for kubernetes, k8s, helm, kafka on kubernetes, strimzi, confluent operator, kafka operator, statefulset, kafka helm chart, k8s deployment, kubernetes kafka, deploy kafka to k8s.
---
# Kafka on Kubernetes Deployment
Expert guidance for deploying Apache Kafka on Kubernetes using industry-standard tools.
## When to Use This Skill
I activate when you need help with:
- **Kubernetes deployments**: "Deploy Kafka on Kubernetes", "run Kafka in K8s", "Kafka Helm chart"
- **Operator selection**: "Strimzi vs Confluent Operator", "which Kafka operator to use"
- **StatefulSet patterns**: "Kafka StatefulSet best practices", "persistent volumes for Kafka"
- **Production K8s**: "Production-ready Kafka on K8s", "Kafka high availability in Kubernetes"
## What I Know
### Deployment Options Comparison
| Approach | Difficulty | Production-Ready | Best For |
|----------|-----------|------------------|----------|
| **Strimzi Operator** | Easy | ✅ Yes | Self-managed Kafka on K8s, CNCF project |
| **Confluent Operator** | Medium | ✅ Yes | Enterprise features, Confluent ecosystem |
| **Bitnami Helm Chart** | Easy | ⚠️ Mostly | Quick dev/staging environments |
| **Custom StatefulSet** | Hard | ⚠️ Requires expertise | Full control, custom requirements |
**Recommendation**: **Strimzi Operator** for most production use cases (CNCF project, active community, KRaft support)
## Deployment Approach 1: Strimzi Operator (Recommended)
**Strimzi** is a CNCF Sandbox project providing Kubernetes operators for Apache Kafka.
### Features
- ✅ KRaft mode support (Kafka 3.6+, no ZooKeeper)
- ✅ Declarative Kafka management (CRDs)
- ✅ Automatic rolling upgrades
- ✅ Built-in monitoring (Prometheus metrics)
- ✅ Mirror Maker 2 for replication
- ✅ Kafka Connect integration
- ✅ User and topic management via CRDs
### Installation (Helm)
```bash
# 1. Add Strimzi Helm repository
helm repo add strimzi https://strimzi.io/charts/
helm repo update
# 2. Create namespace
kubectl create namespace kafka
# 3. Install Strimzi Operator
helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
--namespace kafka \
--set watchNamespaces="{kafka}" \
--version 0.39.0
# 4. Verify operator is running
kubectl get pods -n kafka
# Output: strimzi-cluster-operator-... Running
```
### Deploy Kafka Cluster (KRaft Mode)
```yaml
# kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: kafka-pool
namespace: kafka
labels:
strimzi.io/cluster: my-kafka-cluster
spec:
replicas: 3
roles:
- controller
- broker
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
class: fast-ssd
deleteClaim: false
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-kafka-cluster
namespace: kafka
annotations:
strimzi.io/kraft: enabled
strimzi.io/node-pools: enabled
spec:
kafka:
version: 3.7.0
metadataVersion: 3.7-IV4
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: loadbalancer
tls: true
authentication:
type: tls
config:
default.replication.factor: 3
min.insync.replicas: 2
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
auto.create.topics.enable: false
log.retention.hours: 168
log.segment.bytes: 1073741824
compression.type: lz4
resources:
requests:
memory: 4Gi
cpu: "2"
limits:
memory: 8Gi
cpu: "4"
jvmOptions:
-Xms: 2048m
-Xmx: 4096m
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: kafka-metrics
key: kafka-metrics-config.yml
```
```bash
# Apply Kafka cluster
kubectl apply -f kafka-cluster.yaml
# Wait for cluster to be ready (5-10 minutes)
kubectl wait kafka/my-kafka-cluster --for=condition=Ready --timeout=600s -n kafka
# Check status
kubectl get kafka -n kafka
# Output: my-kafka-cluster 3.7.0 3 True
```
### Create Topics (Declaratively)
```yaml
# kafka-topics.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: user-events
namespace: kafka
labels:
strimzi.io/cluster: my-kafka-cluster
spec:
partitions: 12
replicas: 3
config:
retention.ms: 604800000 # 7 days
segment.bytes: 1073741824
compression.type: lz4
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: order-events
namespace: kafka
labels:
strimzi.io/cluster: my-kafka-cluster
spec:
partitions: 6
replicas: 3
config:
retention.ms: 2592000000 # 30 days
min.insync.replicas: 2
```
```bash
# Apply topics
kubectl apply -f kafka-topics.yaml
# Verify topics created
kubectl get kafkatopics -n kafka
```
### Create Users (Declaratively)
```yaml
# kafka-users.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: my-producer
namespace: kafka
labels:
strimzi.io/cluster: my-kafka-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
- resource:
type: topic
name: user-events
patternType: literal
operations: [Write, Describe]
- resource:
type: topic
name: order-events
patternType: literal
operations: [Write, Describe]
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: my-consumer
namespace: kafka
labels:
strimzi.io/cluster: my-kafka-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
- resource:
type: topic
name: user-events
patternType: literal
operations: [Read, Describe]
- resource:
type: group
name: my-consumer-group
patternType: literal
operations: [Read]
```
```bash
# Apply users
kubectl apply -f kafka-users.yaml
# Get user credentials (TLS certificates)
kubectl get secret my-producer -n kafka -o jsonpath='{.data.user\.crt}' | base64 -d > producer.crt
kubectl get secret my-producer -n kafka -o jsonpath='{.data.user\.key}' | base64 -d > producer.key
kubectl get secret my-kafka-cluster-cluster-ca-cert -n kafka -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
```
## Deployment Approach 2: Confluent Operator
**Confluent for Kubernetes (CFK)** provides enterprise-grade Kafka management.
### Features
- ✅ Full Confluent Platform (Kafka, Schema Registry, ksqlDB, Connect)
- ✅ Hybrid deployments (K8s + on-prem)
- ✅ Rolling upgrades with zero downtime
- ✅ Multi-region replication
- ✅ Advanced security (RBAC, encryption)
- ⚠️ Requires Confluent Platform license (paid)
### Installation
```bash
# 1. Add Confluent Helm repository
helm repo add confluentinc https://packages.confluent.io/helm
helm repo update
# 2. Create namespace
kubectl create namespace confluent
# 3. Install Confluent Operator
helm install confluent-operator confluentinc/confluent-for-kubernetes \
--namespace confluent \
--version 0.921.11
# 4. Verify
kubectl get pods -n confluent
```
### Deploy Kafka Cluster
```yaml
# kafka-cluster-confluent.yaml
apiVersion: platform.confluent.io/v1beta1
kind: Kafka
metadata:
name: kafka
namespace: confluent
spec:
replicas: 3
image:
application: confluentinc/cp-server:7.6.0
init: confluentinc/confluent-init-container:2.7.0
dataVolumeCapacity: 100Gi
storageClass:
name: fast-ssd
metricReporter:
enabled: true
listeners:
internal:
authentication:
type: plain
tls:
enabled: true
external:
authentication:
type: plain
tls:
enabled: true
dependencies:
zookeeper:
endpoint: zookeeper.confluent.svc.cluster.local:2181
podTemplate:
resources:
requests:
memory: 4Gi
cpu: 2
limits:
memory: 8Gi
cpu: 4
```
```bash
# Apply Kafka cluster
kubectl apply -f kafka-cluster-confluent.yaml
# Wait for cluster
kubectl wait kafka/kafka --for=condition=Ready --timeout=600s -n confluent
```
## Deployment Approach 3: Bitnami Helm Chart (Dev/Staging)
**Bitnami Helm Chart** is simple but less suitable for production.
### Installation
```bash
# 1. Add Bitnami repository
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
# 2. Install Kafka (KRaft mode)
helm install kafka bitnami/kafka \
--namespace kafka \
--create-namespace \
--set kraft.enabled=true \
--set controller.replicaCount=3 \
--set broker.replicaCount=3 \
--set persistence.size=100Gi \
--set persistence.storageClass=fast-ssd \
--set metrics.kafka.enabled=true \
--set metrics.jmx.enabled=true
# 3. Get bootstrap servers
export KAFKA_BOOTSTRAP=$(kubectl get svc kafka -n kafka -o jsonpath='{.status.loadBalancer.ingress[0].hostname}'):9092
```
**Limitations**:
- ⚠️ Less production-ready than Strimzi/Confluent
- ⚠️ Limited declarative topic/user management
- ⚠️ Fewer advanced features (no MirrorMaker 2, limited RBAC)
## Production Best Practices
### 1. Storage Configuration
**Use SSD-backed storage classes** for Kafka logs:
```yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: fast-ssd
provisioner: kubernetes.io/aws-ebs # or pd.csi.storage.gke.io for GKE
parameters:
type: gp3 # AWS EBS GP3 (or io2 for extreme performance)
iopsPerGB: "50"
throughput: "125"
allowVolumeExpansion: true
volumeBindingMode: WaitForFirstConsumer
```
**Kafka storage requirements**:
- **Min IOPS**: 3000+ per broker
- **Min Throughput**: 125 MB/s per broker
- **Persistent**: Use `deleteClaim: false` (don't delete data on pod deletion)
### 2. Resource Limits
```yaml
resources:
requests:
memory: 4Gi
cpu: "2"
limits:
memory: 8Gi
cpu: "4"
jvmOptions:
-Xms: 2048m # Initial heap (50% of memory request)
-Xmx: 4096m # Max heap (50% of memory limit, leave room for OS cache)
```
**Sizing guidelines**:
- **Small (dev)**: 2 CPU, 4Gi memory
- **Medium (staging)**: 4 CPU, 8Gi memory
- **Large (production)**: 8 CPU, 16Gi memory
### 3. Pod Disruption Budgets
Ensure high availability during K8s upgrades:
```yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: kafka-pdb
namespace: kafka
spec:
maxUnavailable: 1
selector:
matchLabels:
app.kubernetes.io/name: kafka
```
### 4. Affinity Rules
**Spread brokers across availability zones**:
```yaml
spec:
kafka:
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: strimzi.io/name
operator: In
values:
- my-kafka-cluster-kafka
topologyKey: topology.kubernetes.io/zone
```
### 5. Network Policies
**Restrict access to Kafka brokers**:
```yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: kafka-network-policy
namespace: kafka
spec:
podSelector:
matchLabels:
strimzi.io/name: my-kafka-cluster-kafka
policyTypes:
- Ingress
ingress:
- from:
- podSelector:
matchLabels:
app: my-producer
- podSelector:
matchLabels:
app: my-consumer
ports:
- protocol: TCP
port: 9092
- protocol: TCP
port: 9093
```
## Monitoring Integration
### Prometheus + Grafana Setup
Strimzi provides built-in Prometheus metrics exporter:
```yaml
# kafka-metrics-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-metrics
namespace: kafka
data:
kafka-metrics-config.yml: |
# Use JMX Exporter config from:
# plugins/specweave-kafka/monitoring/prometheus/kafka-jmx-exporter.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true
whitelistObjectNames:
- "kafka.server:type=BrokerTopicMetrics,name=*"
# ... (copy from kafka-jmx-exporter.yml)
```
```bash
# Apply metrics config
kubectl apply -f kafka-metrics-configmap.yaml
# Install Prometheus Operator (if not already installed)
helm install prometheus prometheus-community/kube-prometheus-stack \
--namespace monitoring \
--create-namespace
# Create PodMonitor for Kafka
kubectl apply -f - <<EOF
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: kafka-metrics
namespace: kafka
spec:
selector:
matchLabels:
strimzi.io/kind: Kafka
podMetricsEndpoints:
- port: tcp-prometheus
interval: 30s
EOF
# Access Grafana dashboards (from kafka-observability skill)
kubectl port-forward -n monitoring svc/prometheus-grafana 3000:80
# Open: http://localhost:3000
# Dashboards: Kafka Cluster Overview, Broker Metrics, Consumer Lag, Topic Metrics, JVM Metrics
```
## Troubleshooting
### "Pods stuck in Pending state"
**Cause**: Insufficient resources or storage class not found
**Fix**:
```bash
# Check events
kubectl describe pod kafka-my-kafka-cluster-0 -n kafka
# Check storage class exists
kubectl get storageclass
# If missing, create fast-ssd storage class (see Production Best Practices above)
```
### "Kafka broker not ready after 10 minutes"
**Cause**: Slow storage provisioning or resource limits too low
**Fix**:
```bash
# Check broker logs
kubectl logs kafka-my-kafka-cluster-0 -n kafka
# Common issues:
# 1. Low IOPS on storage → Use GP3 or better
# 2. Low memory → Increase resources.requests.memory
# 3. KRaft quorum not formed → Check all brokers are running
```
### "Cannot connect to Kafka from outside K8s"
**Cause**: External listener not configured
**Fix**:
```yaml
# Add external listener (Strimzi)
spec:
kafka:
listeners:
- name: external
port: 9094
type: loadbalancer
tls: true
authentication:
type: tls
# Get external bootstrap server
kubectl get kafka my-kafka-cluster -n kafka -o jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}'
```
## Scaling Operations
### Horizontal Scaling (Add Brokers)
```bash
# Strimzi: Update KafkaNodePool replicas
kubectl patch kafkanodepool kafka-pool -n kafka --type='json' \
-p='[{"op": "replace", "path": "/spec/replicas", "value": 5}]'
# Confluent: Update Kafka CR
kubectl patch kafka kafka -n confluent --type='json' \
-p='[{"op": "replace", "path": "/spec/replicas", "value": 5}]'
# Wait for new brokers
kubectl rollout status statefulset/kafka-my-kafka-cluster-kafka -n kafka
```
### Vertical Scaling (Change Resources)
```bash
# Update resources in Kafka CR
kubectl patch kafka my-kafka-cluster -n kafka --type='json' \
-p='[
{"op": "replace", "path": "/spec/kafka/resources/requests/memory", "value": "8Gi"},
{"op": "replace", "path": "/spec/kafka/resources/requests/cpu", "value": "4"}
]'
# Rolling restart will happen automatically
```
## Integration with Other Skills
- **kafka-iac-deployment**: Alternative to K8s (use Terraform for cloud-managed Kafka)
- **kafka-observability**: Set up Prometheus + Grafana dashboards for K8s Kafka
- **kafka-architecture**: Cluster sizing and partitioning strategy
- **kafka-cli-tools**: Test K8s Kafka cluster with kcat
## Quick Reference Commands
```bash
# Strimzi
kubectl get kafka -n kafka # List Kafka clusters
kubectl get kafkatopics -n kafka # List topics
kubectl get kafkausers -n kafka # List users
kubectl logs kafka-my-kafka-cluster-0 -n kafka # Check broker logs
# Confluent
kubectl get kafka -n confluent # List Kafka clusters
kubectl get schemaregistry -n confluent # List Schema Registry
kubectl get ksqldb -n confluent # List ksqlDB
# Port-forward for testing
kubectl port-forward -n kafka svc/my-kafka-cluster-kafka-bootstrap 9092:9092
```
---
**Next Steps After K8s Deployment**:
1. Use **kafka-observability** skill to verify Prometheus metrics and Grafana dashboards
2. Use **kafka-cli-tools** skill to test cluster with kcat
3. Deploy your producer/consumer applications to K8s
4. Set up GitOps for declarative topic/user management (ArgoCD, Flux)

View File

@@ -0,0 +1,290 @@
---
name: kafka-mcp-integration
description: MCP server integration for Kafka operations. Auto-activates on keywords kafka mcp, mcp server, mcp configure, mcp setup, kanapuli, tuannvm, confluent mcp, kafka integration. Provides configuration examples and connection guidance for all 4 MCP servers.
---
# Kafka MCP Server Integration
Expert knowledge for integrating SpecWeave with Kafka MCP (Model Context Protocol) servers. Supports 4 MCP server implementations with auto-detection and configuration guidance.
---
> **Code-First Recommendation**: For most Kafka automation tasks, [writing code is better than MCP](https://www.anthropic.com/engineering/code-execution-with-mcp) (98% token reduction). Use **kafkajs** or **kafka-node** directly:
>
> ```typescript
> import { Kafka } from 'kafkajs';
> const kafka = new Kafka({ brokers: ['localhost:9092'] });
> const producer = kafka.producer();
> await producer.send({ topic: 'events', messages: [{ value: 'Hello' }] });
> ```
>
> **When MCP IS useful**: Quick interactive debugging, topic exploration, Claude Desktop integration.
>
> **When to use code instead**: CI/CD pipelines, test automation, production scripts, anything that should be committed and reusable.
---
## Supported MCP Servers
### 1. kanapuli/mcp-kafka (Node.js)
**Installation**:
```bash
npm install -g mcp-kafka
```
**Capabilities**:
- Authentication: SASL_PLAINTEXT, PLAINTEXT
- Operations: produce, consume, list-topics, describe-topic, get-offsets
- Best for: Basic Kafka operations, quick prototyping
**Configuration Example**:
```json
{
"mcpServers": {
"kafka": {
"command": "npx",
"args": ["mcp-kafka"],
"env": {
"KAFKA_BROKERS": "localhost:9092",
"KAFKA_SASL_MECHANISM": "plain",
"KAFKA_SASL_USERNAME": "user",
"KAFKA_SASL_PASSWORD": "password"
}
}
}
}
```
### 2. tuannvm/kafka-mcp-server (Go)
**Installation**:
```bash
go install github.com/tuannvm/kafka-mcp-server@latest
```
**Capabilities**:
- Authentication: SASL_SCRAM_SHA_256, SASL_SCRAM_SHA_512, SASL_SSL, PLAINTEXT
- Operations: All CRUD operations, consumer group management, offset management
- Best for: Production use, advanced SASL authentication
**Configuration Example**:
```json
{
"mcpServers": {
"kafka": {
"command": "kafka-mcp-server",
"args": [
"--brokers", "localhost:9092",
"--sasl-mechanism", "SCRAM-SHA-256",
"--sasl-username", "admin",
"--sasl-password", "admin-secret"
]
}
}
}
```
### 3. Joel-hanson/kafka-mcp-server (Python)
**Installation**:
```bash
pip install kafka-mcp-server
```
**Capabilities**:
- Authentication: SASL_PLAINTEXT, PLAINTEXT, SSL
- Operations: produce, consume, list-topics, describe-topic
- Best for: Claude Desktop integration, Python ecosystem
**Configuration Example**:
```json
{
"mcpServers": {
"kafka": {
"command": "python",
"args": ["-m", "kafka_mcp_server"],
"env": {
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"
}
}
}
}
```
### 4. Confluent Official MCP (Enterprise)
**Installation**:
```bash
confluent plugin install mcp-server
```
**Capabilities**:
- Authentication: OAuth, SASL_SCRAM, API Keys
- Operations: All Kafka operations, Schema Registry, ksqlDB, Flink SQL
- Advanced: Natural language interface, AI-powered query generation
- Best for: Confluent Cloud, enterprise deployments
**Configuration Example**:
```json
{
"mcpServers": {
"kafka": {
"command": "confluent",
"args": ["mcp", "start"],
"env": {
"CONFLUENT_CLOUD_API_KEY": "your-api-key",
"CONFLUENT_CLOUD_API_SECRET": "your-api-secret"
}
}
}
}
```
## Auto-Detection
SpecWeave can auto-detect installed MCP servers:
```bash
/specweave-kafka:mcp-configure
```
This command:
1. Scans for installed MCP servers (npm, go, pip, confluent CLI)
2. Checks which servers are currently running
3. Ranks servers by capabilities (Confluent > tuannvm > kanapuli > Joel-hanson)
4. Generates recommended configuration
5. Tests connection
## Quick Start
### Option 1: Auto-Configure (Recommended)
```bash
/specweave-kafka:mcp-configure
```
Interactive wizard guides you through:
- MCP server selection (or auto-detect)
- Broker URL configuration
- Authentication setup
- Connection testing
### Option 2: Manual Configuration
1. **Install preferred MCP server** (see installation commands above)
2. **Create `.mcp.json` configuration**:
```json
{
"serverType": "tuannvm",
"brokerUrls": ["localhost:9092"],
"authentication": {
"mechanism": "SASL/SCRAM-SHA-256",
"username": "admin",
"password": "admin-secret"
}
}
```
3. **Test connection**:
```bash
# Via MCP server CLI
kafka-mcp-server test-connection
# Or via SpecWeave
node -e "import('./dist/lib/mcp/detector.js').then(async ({ MCPServerDetector }) => {
const detector = new MCPServerDetector();
const result = await detector.detectAll();
console.log(JSON.stringify(result, null, 2));
});"
```
## MCP Server Comparison
| Feature | kanapuli | tuannvm | Joel-hanson | Confluent |
|---------|----------|---------|-------------|-----------|
| **Language** | Node.js | Go | Python | Official CLI |
| **SASL_PLAINTEXT** | ✅ | ✅ | ✅ | ✅ |
| **SCRAM-SHA-256** | ❌ | ✅ | ❌ | ✅ |
| **SCRAM-SHA-512** | ❌ | ✅ | ❌ | ✅ |
| **mTLS/SSL** | ❌ | ✅ | ✅ | ✅ |
| **OAuth** | ❌ | ❌ | ❌ | ✅ |
| **Consumer Groups** | ❌ | ✅ | ❌ | ✅ |
| **Offset Mgmt** | ❌ | ✅ | ❌ | ✅ |
| **Schema Registry** | ❌ | ❌ | ❌ | ✅ |
| **ksqlDB** | ❌ | ❌ | ❌ | ✅ |
| **Flink SQL** | ❌ | ❌ | ❌ | ✅ |
| **AI/NL Interface** | ❌ | ❌ | ❌ | ✅ |
| **Best For** | Prototyping | Production | Desktop | Enterprise |
## Troubleshooting
### MCP Server Not Detected
```bash
# Check if MCP server installed
npm list -g mcp-kafka # kanapuli
which kafka-mcp-server # tuannvm
pip show kafka-mcp-server # Joel-hanson
confluent version # Confluent
```
### Connection Refused
- Verify Kafka broker is running: `kcat -L -b localhost:9092`
- Check firewall rules
- Validate broker URL (correct host:port)
### Authentication Failed
- Double-check credentials (username, password, API keys)
- Verify SASL mechanism matches broker configuration
- Check broker logs for authentication errors
### Operations Not Working
- Ensure MCP server supports the operation (see comparison table)
- Check broker ACLs (permissions for the authenticated user)
- Verify topic exists: `/specweave-kafka:mcp-configure list-topics`
## Operations via MCP
Once configured, you can perform Kafka operations via MCP:
```typescript
import { MCPServerDetector } from './lib/mcp/detector';
const detector = new MCPServerDetector();
const result = await detector.detectAll();
// Use recommended server
if (result.recommended) {
console.log(`Using ${result.recommended} MCP server`);
console.log(`Reason: ${result.rankingReason}`);
}
```
## Security Best Practices
1. **Never commit credentials** - Use environment variables or secrets manager
2. **Use strongest auth** - Prefer SCRAM-SHA-512 > SCRAM-SHA-256 > PLAINTEXT
3. **Enable TLS/SSL** - Encrypt communication with broker
4. **Rotate credentials** - Regularly update passwords and API keys
5. **Least privilege** - Grant only necessary ACLs to MCP server user
## Related Commands
- `/specweave-kafka:mcp-configure` - Interactive MCP server setup
- `/specweave-kafka:dev-env start` - Start local Kafka for testing
- `/specweave-kafka:deploy` - Deploy production Kafka cluster
## External Links
- [kanapuli/mcp-kafka](https://github.com/kanapuli/mcp-kafka)
- [tuannvm/kafka-mcp-server](https://github.com/tuannvm/kafka-mcp-server)
- [Joel-hanson/kafka-mcp-server](https://github.com/Joel-hanson/kafka-mcp-server)
- [Confluent MCP Documentation](https://docs.confluent.io/platform/current/mcp/)
- [MCP Protocol Specification](https://modelcontextprotocol.org/)

View File

@@ -0,0 +1,576 @@
---
name: kafka-observability
description: Kafka monitoring and observability expert. Guides Prometheus + Grafana setup, JMX metrics, alerting rules, and dashboard configuration. Activates for kafka monitoring, prometheus, grafana, kafka metrics, jmx exporter, kafka observability, monitoring setup, kafka dashboards, alerting, kafka performance monitoring, metrics collection.
---
# Kafka Monitoring & Observability
Expert guidance for implementing comprehensive monitoring and observability for Apache Kafka using Prometheus and Grafana.
## When to Use This Skill
I activate when you need help with:
- **Monitoring setup**: "Set up Kafka monitoring", "configure Prometheus for Kafka", "Grafana dashboards for Kafka"
- **Metrics collection**: "Kafka JMX metrics", "export Kafka metrics to Prometheus"
- **Alerting**: "Kafka alerting rules", "alert on under-replicated partitions", "critical Kafka metrics"
- **Troubleshooting**: "Monitor Kafka performance", "track consumer lag", "broker health monitoring"
## What I Know
### Available Monitoring Components
This plugin provides a complete monitoring stack:
#### 1. **Prometheus JMX Exporter Configuration**
- **Location**: `plugins/specweave-kafka/monitoring/prometheus/kafka-jmx-exporter.yml`
- **Purpose**: Export Kafka JMX metrics to Prometheus format
- **Metrics Exported**:
- Broker topic metrics (bytes in/out, messages in, request rate)
- Replica manager (under-replicated partitions, ISR shrinks/expands)
- Controller metrics (active controller, offline partitions, leader elections)
- Request metrics (produce/fetch latency)
- Log metrics (flush rate, flush latency)
- JVM metrics (heap, GC, threads, file descriptors)
#### 2. **Grafana Dashboards** (5 Dashboards)
- **Location**: `plugins/specweave-kafka/monitoring/grafana/dashboards/`
- **Dashboards**:
1. **kafka-cluster-overview.json** - Cluster health and throughput
2. **kafka-broker-metrics.json** - Per-broker performance
3. **kafka-consumer-lag.json** - Consumer lag monitoring
4. **kafka-topic-metrics.json** - Topic-level metrics
5. **kafka-jvm-metrics.json** - JVM health (heap, GC, threads)
#### 3. **Grafana Provisioning**
- **Location**: `plugins/specweave-kafka/monitoring/grafana/provisioning/`
- **Files**:
- `dashboards/kafka.yml` - Dashboard provisioning config
- `datasources/prometheus.yml` - Prometheus datasource config
## Setup Workflow 1: JMX Exporter (Self-Hosted Kafka)
For Kafka running on VMs or bare metal (non-Kubernetes).
### Step 1: Download JMX Prometheus Agent
```bash
# Download JMX Prometheus agent JAR
cd /opt
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.20.0/jmx_prometheus_javaagent-0.20.0.jar
# Copy JMX Exporter config
cp plugins/specweave-kafka/monitoring/prometheus/kafka-jmx-exporter.yml /opt/kafka-jmx-exporter.yml
```
### Step 2: Configure Kafka Broker
Add JMX exporter to Kafka startup script:
```bash
# Edit Kafka startup (e.g., /etc/systemd/system/kafka.service)
[Service]
Environment="KAFKA_OPTS=-javaagent:/opt/jmx_prometheus_javaagent-0.20.0.jar=7071:/opt/kafka-jmx-exporter.yml"
```
Or add to `kafka-server-start.sh`:
```bash
export KAFKA_OPTS="-javaagent:/opt/jmx_prometheus_javaagent-0.20.0.jar=7071:/opt/kafka-jmx-exporter.yml"
```
### Step 3: Restart Kafka and Verify
```bash
# Restart Kafka broker
sudo systemctl restart kafka
# Verify JMX exporter is running (port 7071)
curl localhost:7071/metrics | grep kafka_server
# Expected output: kafka_server_broker_topic_metrics_bytesin_total{...} 12345
```
### Step 4: Configure Prometheus Scraping
Add Kafka brokers to Prometheus config:
```yaml
# prometheus.yml
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets:
- 'kafka-broker-1:7071'
- 'kafka-broker-2:7071'
- 'kafka-broker-3:7071'
scrape_interval: 30s
```
```bash
# Reload Prometheus
sudo systemctl reload prometheus
# OR send SIGHUP
kill -HUP $(pidof prometheus)
# Verify scraping
curl http://localhost:9090/api/v1/targets | jq '.data.activeTargets[] | select(.job=="kafka")'
```
## Setup Workflow 2: Strimzi (Kubernetes)
For Kafka running on Kubernetes with Strimzi Operator.
### Step 1: Create JMX Exporter ConfigMap
```bash
# Create ConfigMap from JMX exporter config
kubectl create configmap kafka-metrics \
--from-file=kafka-metrics-config.yml=plugins/specweave-kafka/monitoring/prometheus/kafka-jmx-exporter.yml \
-n kafka
```
### Step 2: Configure Kafka CR with Metrics
```yaml
# kafka-cluster.yaml (add metricsConfig section)
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-kafka-cluster
namespace: kafka
spec:
kafka:
version: 3.7.0
replicas: 3
# ... other config ...
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: kafka-metrics
key: kafka-metrics-config.yml
```
```bash
# Apply updated Kafka CR
kubectl apply -f kafka-cluster.yaml
# Verify metrics endpoint (wait for rolling restart)
kubectl exec -it kafka-my-kafka-cluster-0 -n kafka -- curl localhost:9404/metrics | grep kafka_server
```
### Step 3: Install Prometheus Operator (if not installed)
```bash
# Add Prometheus Community Helm repo
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update
# Install kube-prometheus-stack (Prometheus + Grafana + Alertmanager)
helm install prometheus prometheus-community/kube-prometheus-stack \
--namespace monitoring \
--create-namespace \
--set prometheus.prometheusSpec.serviceMonitorSelectorNilUsesHelmValues=false \
--set prometheus.prometheusSpec.podMonitorSelectorNilUsesHelmValues=false
```
### Step 4: Create PodMonitor for Kafka
```yaml
# kafka-podmonitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: kafka-metrics
namespace: kafka
labels:
app: strimzi
spec:
selector:
matchLabels:
strimzi.io/kind: Kafka
podMetricsEndpoints:
- port: tcp-prometheus
interval: 30s
```
```bash
# Apply PodMonitor
kubectl apply -f kafka-podmonitor.yaml
# Verify Prometheus is scraping Kafka
kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090
# Open: http://localhost:9090/targets
# Should see kafka-metrics/* targets
```
## Setup Workflow 3: Grafana Dashboards
### Installation (Docker Compose)
If using Docker Compose for local development:
```yaml
# docker-compose.yml (add to existing Kafka setup)
version: '3.8'
services:
# ... Kafka services ...
prometheus:
image: prom/prometheus:v2.48.0
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
grafana:
image: grafana/grafana:10.2.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- ./monitoring/grafana/provisioning:/etc/grafana/provisioning
- ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards
- grafana-data:/var/lib/grafana
volumes:
prometheus-data:
grafana-data:
```
```bash
# Start monitoring stack
docker-compose up -d prometheus grafana
# Access Grafana
# URL: http://localhost:3000
# Username: admin
# Password: admin
```
### Installation (Kubernetes)
Dashboards are auto-provisioned if using kube-prometheus-stack:
```bash
# Create ConfigMaps for each dashboard
for dashboard in plugins/specweave-kafka/monitoring/grafana/dashboards/*.json; do
name=$(basename "$dashboard" .json)
kubectl create configmap "kafka-dashboard-$name" \
--from-file="$dashboard" \
-n monitoring \
--dry-run=client -o yaml | kubectl apply -f -
done
# Label ConfigMaps for Grafana auto-discovery
kubectl label configmap -n monitoring kafka-dashboard-* grafana_dashboard=1
# Grafana will auto-import dashboards (wait 30-60 seconds)
# Access Grafana
kubectl port-forward -n monitoring svc/prometheus-grafana 3000:80
# URL: http://localhost:3000
# Username: admin
# Password: prom-operator (default kube-prometheus-stack password)
```
### Manual Dashboard Import
If auto-provisioning doesn't work:
```bash
# 1. Access Grafana UI
# 2. Go to: Dashboards → Import
# 3. Upload JSON files from:
# plugins/specweave-kafka/monitoring/grafana/dashboards/
# Or use Grafana API
for dashboard in plugins/specweave-kafka/monitoring/grafana/dashboards/*.json; do
curl -X POST http://admin:admin@localhost:3000/api/dashboards/db \
-H "Content-Type: application/json" \
-d @"$dashboard"
done
```
## Dashboard Overview
### 1. **Kafka Cluster Overview** (`kafka-cluster-overview.json`)
**Purpose**: High-level cluster health
**Key Metrics**:
- Active Controller Count (should be exactly 1)
- Under-Replicated Partitions (should be 0) ⚠️ CRITICAL
- Offline Partitions Count (should be 0) ⚠️ CRITICAL
- Unclean Leader Elections (should be 0)
- Cluster Throughput (bytes in/out per second)
- Request Rate (produce, fetch requests per second)
- ISR Changes (shrinks/expands)
- Leader Election Rate
**Use When**: Checking overall cluster health
### 2. **Kafka Broker Metrics** (`kafka-broker-metrics.json`)
**Purpose**: Per-broker performance
**Key Metrics**:
- Broker CPU Usage (% utilization)
- Broker Heap Memory Usage
- Broker Network Throughput (bytes in/out)
- Request Handler Idle Percentage (low = CPU saturation)
- File Descriptors (open vs max)
- Log Flush Latency (p50, p99)
- JVM GC Collection Count/Time
**Use When**: Investigating broker performance issues
### 3. **Kafka Consumer Lag** (`kafka-consumer-lag.json`)
**Purpose**: Consumer lag monitoring
**Key Metrics**:
- Consumer Lag per Topic/Partition
- Total Lag per Consumer Group
- Offset Commit Rate
- Current Consumer Offset
- Log End Offset (producer offset)
- Consumer Group Members
**Use When**: Troubleshooting slow consumers or lag spikes
### 4. **Kafka Topic Metrics** (`kafka-topic-metrics.json`)
**Purpose**: Topic-level metrics
**Key Metrics**:
- Messages Produced per Topic
- Bytes per Topic (in/out)
- Partition Count per Topic
- Replication Factor
- In-Sync Replicas
- Log Size per Partition
- Current Offset per Partition
- Partition Leader Distribution
**Use When**: Analyzing topic throughput and hotspots
### 5. **Kafka JVM Metrics** (`kafka-jvm-metrics.json`)
**Purpose**: JVM health monitoring
**Key Metrics**:
- Heap Memory Usage (used vs max)
- Heap Utilization Percentage
- GC Collection Rate (collections/sec)
- GC Collection Time (ms/sec)
- JVM Thread Count
- Heap Memory by Pool (young gen, old gen, survivor)
- Off-Heap Memory Usage (metaspace, code cache)
- GC Pause Time Percentiles (p50, p95, p99)
**Use When**: Investigating memory leaks or GC pauses
## Critical Alerts Configuration
Create Prometheus alerting rules for critical Kafka metrics:
```yaml
# kafka-alerts.yml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: kafka-alerts
namespace: monitoring
spec:
groups:
- name: kafka.rules
interval: 30s
rules:
# CRITICAL: Under-Replicated Partitions
- alert: KafkaUnderReplicatedPartitions
expr: sum(kafka_server_replica_manager_under_replicated_partitions) > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka has under-replicated partitions"
description: "{{ $value }} partitions are under-replicated. Data loss risk!"
# CRITICAL: Offline Partitions
- alert: KafkaOfflinePartitions
expr: kafka_controller_offline_partitions_count > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka has offline partitions"
description: "{{ $value }} partitions are offline. Service degradation!"
# CRITICAL: No Active Controller
- alert: KafkaNoActiveController
expr: kafka_controller_active_controller_count == 0
for: 1m
labels:
severity: critical
annotations:
summary: "No active Kafka controller"
description: "Cluster has no active controller. Cannot perform administrative operations!"
# WARNING: High Consumer Lag
- alert: KafkaConsumerLagHigh
expr: sum by (consumergroup) (kafka_consumergroup_lag) > 10000
for: 10m
labels:
severity: warning
annotations:
summary: "Consumer group {{ $labels.consumergroup }} has high lag"
description: "Lag is {{ $value }} messages. Consumers may be slow."
# WARNING: High CPU Usage
- alert: KafkaBrokerHighCPU
expr: os_process_cpu_load{job="kafka"} > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "Broker {{ $labels.instance }} has high CPU usage"
description: "CPU usage is {{ $value | humanizePercentage }}. Consider scaling."
# WARNING: Low Heap Memory
- alert: KafkaBrokerLowHeapMemory
expr: jvm_memory_heap_used_bytes{job="kafka"} / jvm_memory_heap_max_bytes{job="kafka"} > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "Broker {{ $labels.instance }} has low heap memory"
description: "Heap usage is {{ $value | humanizePercentage }}. Risk of OOM!"
# WARNING: High GC Time
- alert: KafkaBrokerHighGCTime
expr: rate(jvm_gc_collection_time_ms_total{job="kafka"}[5m]) > 500
for: 5m
labels:
severity: warning
annotations:
summary: "Broker {{ $labels.instance }} spending too much time in GC"
description: "GC time is {{ $value }}ms/sec. Application pauses likely."
```
```bash
# Apply alerts (Kubernetes)
kubectl apply -f kafka-alerts.yml
# Verify alerts loaded
kubectl get prometheusrules -n monitoring
```
## Troubleshooting
### "Prometheus not scraping Kafka metrics"
**Symptoms**: No Kafka metrics in Prometheus
**Fix**:
```bash
# 1. Verify JMX exporter is running
curl http://kafka-broker:7071/metrics
# 2. Check Prometheus targets
curl http://localhost:9090/api/v1/targets | jq '.data.activeTargets[] | select(.job=="kafka")'
# 3. Check Prometheus logs
kubectl logs -n monitoring prometheus-kube-prometheus-prometheus-0
# Common issues:
# - Firewall blocking port 7071
# - Incorrect scrape config
# - Kafka broker not running
```
### "Grafana dashboards not loading"
**Symptoms**: Dashboards show "No data"
**Fix**:
```bash
# 1. Verify Prometheus datasource
# Grafana UI → Configuration → Data Sources → Prometheus → Test
# 2. Check if Kafka metrics exist in Prometheus
# Prometheus UI → Graph → Enter: kafka_server_broker_topic_metrics_bytesin_total
# 3. Verify dashboard queries match your Prometheus job name
# Dashboard panels use job="kafka" by default
# If your job name is different, update dashboard JSON
```
### "Consumer lag metrics missing"
**Symptoms**: Consumer lag dashboard empty
**Fix**:
Consumer lag metrics require **Kafka Exporter** (separate from JMX Exporter):
```bash
# Install Kafka Exporter (Kubernetes)
helm install kafka-exporter prometheus-community/prometheus-kafka-exporter \
--namespace monitoring \
--set kafkaServer={kafka-bootstrap:9092}
# Or run as Docker container
docker run -d -p 9308:9308 \
danielqsj/kafka-exporter \
--kafka.server=kafka:9092 \
--web.listen-address=:9308
# Add to Prometheus scrape config
scrape_configs:
- job_name: 'kafka-exporter'
static_configs:
- targets: ['kafka-exporter:9308']
```
## Integration with Other Skills
- **kafka-iac-deployment**: Set up monitoring during Terraform deployment
- **kafka-kubernetes**: Configure monitoring for Strimzi Kafka on K8s
- **kafka-architecture**: Use cluster sizing metrics to validate capacity planning
- **kafka-cli-tools**: Use kcat to generate test traffic and verify metrics
## Quick Reference Commands
```bash
# Check JMX exporter metrics
curl http://localhost:7071/metrics | grep -E "(kafka_server|kafka_controller)"
# Prometheus query examples
curl -g 'http://localhost:9090/api/v1/query?query=kafka_server_replica_manager_under_replicated_partitions'
# Grafana dashboard export
curl http://admin:admin@localhost:3000/api/dashboards/uid/kafka-cluster-overview | jq .dashboard > backup.json
# Reload Prometheus config
kill -HUP $(pidof prometheus)
# Check Prometheus targets
curl http://localhost:9090/api/v1/targets | jq '.data.activeTargets[] | select(.job=="kafka")'
```
---
**Next Steps After Monitoring Setup**:
1. Review all 5 Grafana dashboards to familiarize yourself with metrics
2. Set up alerting (Slack, PagerDuty, email)
3. Create runbooks for critical alerts (under-replicated partitions, offline partitions, no controller)
4. Monitor for 7 days to establish baseline metrics
5. Tune JVM settings based on GC metrics