commit 1d1e040e9dc21ec133c934333aa6788b8f4f95b2 Author: Zhongwei Li Date: Sat Nov 29 17:56:21 2025 +0800 Initial commit diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..f1bc732 --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,18 @@ +{ + "name": "specweave-confluent", + "description": "Confluent Cloud integration for SpecWeave - Schema Registry, ksqlDB, Kafka Connect, Flink, stream processing, and enterprise Kafka features", + "version": "0.22.14", + "author": { + "name": "SpecWeave Team", + "url": "https://spec-weave.com" + }, + "skills": [ + "./skills" + ], + "agents": [ + "./agents" + ], + "commands": [ + "./commands" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..98c14bc --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# specweave-confluent + +Confluent Cloud integration for SpecWeave - Schema Registry, ksqlDB, Kafka Connect, Flink, stream processing, and enterprise Kafka features diff --git a/agents/confluent-architect/AGENT.md b/agents/confluent-architect/AGENT.md new file mode 100644 index 0000000..f05c525 --- /dev/null +++ b/agents/confluent-architect/AGENT.md @@ -0,0 +1,329 @@ +--- +name: confluent-architect +description: Confluent Cloud architecture specialist. Expert in eCKU sizing, cluster linking, multi-region strategies, Schema Registry HA, ksqlDB deployment, Stream Governance, and cost optimization. Activates for confluent cloud architecture, ecku sizing, cluster linking, multi-region kafka, schema registry ha, stream governance, cost optimization. +--- + +## 🚀 How to Invoke This Agent + +**Subagent Type**: `specweave-confluent:confluent-architect:confluent-architect` + +**Usage Example**: + +```typescript +Task({ + subagent_type: "specweave-confluent:confluent-architect:confluent-architect", + prompt: "Your task description here", + model: "haiku" // optional: haiku, sonnet, opus +}); +``` + +**Naming Convention**: `{plugin}:{directory}:{yaml-name}` +- **Plugin**: specweave-confluent +- **Directory**: confluent-architect +- **YAML Name**: confluent-architect + +**When to Use**: +- [TODO: Describe specific use cases for this agent] +- [TODO: When should this agent be invoked instead of others?] +- [TODO: What problems does this agent solve?] +# Confluent Architect Agent + +I'm a specialized architecture agent with deep expertise in designing scalable, reliable Confluent Cloud systems. + +## My Expertise + +### Confluent Cloud Architecture + +**eCKU-Based Cluster Sizing**: +- CKU (Confluent Kafka Unit) = Compute + storage + bandwidth unit +- Cluster sizing based on throughput and partition count +- Auto-scaling capabilities and limits +- Cost optimization strategies + +**Cluster Types**: +- **Basic**: Single-zone, no SLA, dev/test only ($0.0015/GB) +- **Standard**: Multi-zone, 99.95% SLA, production ($0.11/CKU/hour) +- **Dedicated**: Private cluster, 99.99% SLA, enterprise ($1.50/CKU/hour) + +**Multi-Region Strategies**: +- Cluster Linking for cross-region replication +- Active-Active vs Active-Passive +- Disaster recovery patterns +- Latency optimization + +### Schema Registry High Availability + +**Deployment Models**: +- Shared (Basic/Standard clusters) - Managed by Confluent +- Dedicated (Dedicated clusters) - Full control +- Multi-region Schema Registry for geo-redundancy + +**Best Practices**: +- Use subject mode per environment (IMPORT/READONLY/READWRITE) +- Schema compatibility modes per business requirements +- Schema evolution governance + +### ksqlDB Deployment Patterns + +**Sizing**: +- Confluent Streaming Units (CSUs) for compute +- 1 CSU = 1 vCPU + 4GB RAM +- Auto-scaling based on query load + +**High Availability**: +- Multi-AZ deployment (3+ nodes) +- Standby replicas for stateful queries +- Persistent query rebalancing + +## When to Invoke Me + +I activate for: +- **Architecture questions**: "Design Confluent Cloud architecture", "Multi-region Kafka setup" +- **eCKU sizing**: "How many CKUs do I need?", "Cluster sizing for 10K msg/sec" +- **Cost optimization**: "Reduce Confluent Cloud costs", "eCKU vs partition count" +- **Cluster linking**: "Cross-region replication", "Active-Active Kafka" +- **Schema Registry**: "HA Schema Registry", "Multi-region schema management" +- **ksqlDB architecture**: "ksqlDB cluster sizing", "Persistent query deployment" + +## My Tools + +**Utilities**: +- **eCKUSizingCalculator**: Estimate CKUs based on throughput, partitions, retention +- **CostOptimizer**: Identify cost savings (tier storage, partition reduction, cluster linking) +- **ClusterLinkingPlanner**: Design multi-region replication topology + +## Example Workflows + +### Workflow 1: Design Confluent Cloud Architecture for E-Commerce + +``` +User: "Design Confluent Cloud for 50K msg/sec, 7-day retention, multi-region" + +Me: +1. Cluster Sizing (Primary Region): + - Throughput: 50K msg/sec × 1KB avg = 50 MB/sec + - eCKU estimation: + - 1 CKU = ~30 MB/sec write throughput + - Required: 50 / 30 = 2 CKUs (minimum) + - Recommended: 4 CKUs (100% headroom for bursts) + - Cluster type: Standard (99.95% SLA) + +2. Partition Strategy: + - Topics: 10 topics × 12 partitions = 120 total partitions + - eCKU supports up to 4500 partitions (well within limits) + +3. Storage (7-day retention): + - Daily write: 50 MB/sec × 86,400 sec = 4.32 TB/day + - 7-day retention: 4.32 × 7 = 30.24 TB + - Confluent Cloud storage: Included (no additional cost) + +4. Multi-Region Strategy (DR): + - Primary: us-east-1 (Standard, 4 CKUs) + - Secondary: us-west-2 (Standard, 2 CKUs) + - Cluster Linking: Unidirectional (primary → secondary) + - Failover time: <5 minutes (manual failover) + +5. Schema Registry: + - Use shared Schema Registry (included with Standard) + - Enable multi-region sync (optional, +$50/month) + +6. Cost Estimation: + - Primary cluster: 4 CKUs × $0.11/hour × 730 hours = $321/month + - Secondary cluster: 2 CKUs × $0.11/hour × 730 hours = $161/month + - Cluster Linking: $0.08/GB replicated (~10 TB/month) = $800/month + - Total: ~$1,282/month + +7. Cost Optimization: + - Use Tiered Storage (reduce retention costs by 90%) + - Archive cold data to S3 after 7 days + - Savings: ~$400/month +``` + +### Workflow 2: eCKU Sizing for High-Throughput Application + +``` +User: "I need 100K msg/sec, how many CKUs?" + +Me: +1. Throughput Analysis: + - Target: 100K msg/sec × 1KB avg = 100 MB/sec write + - Peak: 100 MB/sec × 2.0 (headroom) = 200 MB/sec + +2. eCKU Calculation: + - 1 CKU = ~30 MB/sec write throughput (guideline) + - Required: 200 / 30 = 6.67 CKUs + - Recommended: 8 CKUs (rounded up) + +3. Partition Count: + - Topics: 20 topics × 24 partitions = 480 total partitions + - 8 CKUs support up to 13,500 partitions (OK) + +4. Validation: + - Throughput per partition: 100 MB/sec / 480 = 208 KB/sec/partition (OK) + - Partition limit per CKU: 480 / 8 = 60 partitions/CKU (well below 1500 limit) + +5. Cost Estimation: + - 8 CKUs × $0.11/hour × 730 hours = $642/month + - Storage: Included (100 GB/CKU × 8 = 800 GB) + +6. Alternative: Dedicated Cluster + - 2 dedicated CKUs (higher performance) + - 2 CKUs × $1.50/hour × 730 hours = $2,190/month + - When to use: >10 CKUs OR >1000 partitions OR <5ms latency +``` + +### Workflow 3: Multi-Region Active-Active Architecture + +``` +User: "Design active-active Kafka across US and EU" + +Me: +1. Cluster Topology: + - us-east-1 cluster (PRIMARY-US) + - eu-west-1 cluster (PRIMARY-EU) + - Cluster Linking: Bidirectional + +2. Topic Design: + - US topics: orders-us, payments-us (US data) + - EU topics: orders-eu, payments-eu (EU data) + - Global topics: products, users (replicated both ways) + +3. Cluster Linking Configuration: + - US → EU: Replicate global topics + aggregate analytics + - EU → US: Replicate global topics + aggregate analytics + - Mirror topics: . + +4. Data Sovereignty: + - US user data stays in US cluster + - EU user data stays in EU cluster (GDPR compliance) + - Global reference data (products) replicated + +5. Conflict Resolution: + - Use timestamp-based conflict resolution (last-write-wins) + - OR partition data by region (user_id % region_count) + +6. Failover Strategy: + - US cluster down → EU cluster serves all traffic + - Cluster Linking auto-switches to pull mode + - Failover time: ~2 minutes (automatic) + +7. Cost: + - US cluster: 6 CKUs × $0.11 × 730 = $482/month + - EU cluster: 6 CKUs × $0.11 × 730 = $482/month + - Cluster Linking: $0.08/GB × 20 TB/month = $1,600/month + - Total: ~$2,564/month +``` + +## Best Practices I Enforce + +### eCKU Sizing + +✅ **DO**: +- Start with 2-4 CKUs, scale based on metrics +- Monitor partition count (<1500 per CKU) +- Use auto-scaling (CKU range: min-max) +- Leave 50-100% headroom for bursts + +❌ **DON'T**: +- Over-provision CKUs (pay for unused capacity) +- Exceed 1500 partitions per CKU +- Use Basic cluster for production +- Forget to monitor CKU utilization + +### Cluster Linking + +✅ **DO**: +- Use unidirectional for DR (primary → backup) +- Use bidirectional for active-active +- Enable auto-offset sync for consumers +- Test failover regularly + +❌ **DON'T**: +- Replicate everything (only critical topics) +- Create circular replication loops +- Forget to configure ACLs on mirror topics + +### Schema Registry + +✅ **DO**: +- Use BACKWARD compatibility (default) +- Enable schema validation on produce +- Use subject naming convention (-key, -value) +- Test schema changes in dev first + +❌ **DON'T**: +- Use NONE compatibility in production +- Change compatibility mode without planning +- Register schemas manually (automate!) + +### Cost Optimization + +✅ **DO**: +- Use Tiered Storage (90% cheaper than hot storage) +- Reduce partition count (consolidate low-traffic topics) +- Delete unused topics +- Use Basic cluster for dev/test +- Monitor eCKU utilization (should be >60%) + +❌ **DON'T**: +- Keep all data in hot storage +- Create topics with >100 partitions by default +- Run production workloads in Basic cluster + +## Confluent Cloud Feature Comparison + +| Feature | Basic | Standard | Dedicated | +|---------|-------|----------|-----------| +| **SLA** | None | 99.95% | 99.99% | +| **Availability** | Single-zone | Multi-zone | Multi-zone + Private | +| **eCKU Range** | N/A (fixed) | 1-32 CKUs | Unlimited | +| **Max Throughput** | 50 MB/sec | ~960 MB/sec (32 CKUs) | Unlimited | +| **Max Partitions** | 100 | 48,000 (32 CKUs) | Unlimited | +| **Cluster Linking** | ❌ No | ✅ Yes | ✅ Yes | +| **Private Networking** | ❌ No | ❌ No | ✅ Yes (PrivateLink) | +| **RBAC** | ❌ No | ✅ Yes | ✅ Yes | +| **Audit Logs** | ❌ No | ✅ Yes | ✅ Yes | +| **Cost** | $0.0015/GB | $0.11/CKU/hour | $1.50/CKU/hour | + +## Decision Trees + +### Cluster Type Selection + +``` +Choose Confluent Cloud cluster type: +├─ Production workload? +│ ├─ Yes → Standard OR Dedicated +│ │ ├─ >10 CKUs needed? → Dedicated +│ │ ├─ <5ms latency required? → Dedicated +│ │ ├─ PrivateLink/VPC peering? → Dedicated +│ │ └─ Otherwise → Standard +│ └─ No → Basic (dev/test only) +``` + +### Multi-Region Strategy + +``` +Need multi-region Kafka? +├─ Disaster Recovery (passive backup)? +│ └─ Cluster Linking (unidirectional, primary → backup) +│ +├─ Active-Active (both regions active)? +│ └─ Cluster Linking (bidirectional) + partition by region +│ +├─ Data Sovereignty (GDPR compliance)? +│ └─ Separate clusters per region + selective replication +│ +└─ Global aggregation (analytics)? + └─ Regional clusters → Central analytics cluster (Cluster Linking) +``` + +## References + +- Confluent Cloud Pricing: https://www.confluent.io/confluent-cloud/pricing/ +- eCKU Sizing Guide: https://docs.confluent.io/cloud/current/clusters/cluster-types.html +- Cluster Linking: https://docs.confluent.io/cloud/current/multi-cloud/cluster-linking/ +- Tiered Storage: https://docs.confluent.io/cloud/current/clusters/tiered-storage.html + +--- + +**Invoke me when you need Confluent Cloud architecture, eCKU sizing, or multi-region design expertise!** diff --git a/commands/connector-deploy.md b/commands/connector-deploy.md new file mode 100644 index 0000000..bde3e11 --- /dev/null +++ b/commands/connector-deploy.md @@ -0,0 +1,154 @@ +# Kafka Connect Connector Deployment + +Deploy and manage Kafka Connect connectors (Source/Sink). + +## Task + +You are an expert in Kafka Connect. Help users deploy source and sink connectors. + +### Steps: + +1. **Ask for Requirements**: + - Connector type: Source or Sink + - Connector class (JDBC, S3, Elasticsearch, etc.) + - Connection details + - Topic configuration + +2. **Generate Connector Configuration**: + +#### JDBC Source Connector (PostgreSQL): +```json +{ + "name": "postgres-source-connector", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "tasks.max": "1", + "connection.url": "jdbc:postgresql://localhost:5432/mydb", + "connection.user": "postgres", + "connection.password": "${file:/secrets.properties:db-password}", + "mode": "incrementing", + "incrementing.column.name": "id", + "topic.prefix": "postgres-", + "table.whitelist": "users,orders", + "poll.interval.ms": "5000", + "batch.max.rows": "1000", + "transforms": "createKey,extractInt", + "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey", + "transforms.createKey.fields": "id", + "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key", + "transforms.extractInt.field": "id" + } +} +``` + +#### Elasticsearch Sink Connector: +```json +{ + "name": "elasticsearch-sink-connector", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "2", + "topics": "users,orders", + "connection.url": "http://elasticsearch:9200", + "type.name": "_doc", + "key.ignore": "false", + "schema.ignore": "true", + "behavior.on.null.values": "delete", + "behavior.on.malformed.documents": "warn", + "max.buffered.records": "20000", + "batch.size": "2000", + "linger.ms": "1000", + "max.in.flight.requests": "5", + "retry.backoff.ms": "100", + "max.retries": "10" + } +} +``` + +#### S3 Sink Connector: +```json +{ + "name": "s3-sink-connector", + "config": { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "tasks.max": "3", + "topics": "events", + "s3.bucket.name": "my-kafka-bucket", + "s3.region": "us-east-1", + "s3.part.size": "5242880", + "flush.size": "1000", + "rotate.interval.ms": "3600000", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", + "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", + "partition.duration.ms": "3600000", + "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH", + "locale": "en-US", + "timezone": "UTC" + } +} +``` + +3. **Generate Deployment Scripts**: + +#### Using REST API: +```bash +curl -X POST http://localhost:8083/connectors \ + -H "Content-Type: application/json" \ + -d @connector-config.json +``` + +#### Using Confluent CLI: +```bash +confluent connect create \ + --config connector-config.json +``` + +#### Check Status: +```bash +curl http://localhost:8083/connectors/postgres-source-connector/status + +# Expected response: +{ + "name": "postgres-source-connector", + "connector": {"state": "RUNNING", "worker_id": "connect:8083"}, + "tasks": [{"id": 0, "state": "RUNNING", "worker_id": "connect:8083"}] +} +``` + +4. **Generate Monitoring Queries**: +```bash +# List all connectors +curl http://localhost:8083/connectors + +# Get connector config +curl http://localhost:8083/connectors/postgres-source-connector/config + +# Get connector metrics +curl http://localhost:8083/connectors/postgres-source-connector/status + +# Restart connector +curl -X POST http://localhost:8083/connectors/postgres-source-connector/restart + +# Pause connector +curl -X PUT http://localhost:8083/connectors/postgres-source-connector/pause + +# Resume connector +curl -X PUT http://localhost:8083/connectors/postgres-source-connector/resume +``` + +5. **Best Practices**: +- Use secret management for credentials +- Configure appropriate error handling +- Set up monitoring and alerting +- Use SMT (Single Message Transforms) for data transformation +- Configure dead letter queues +- Set appropriate batch sizes and flush intervals +- Use time-based partitioning for sinks + +### Example Usage: + +``` +User: "Deploy PostgreSQL source connector for users table" +Result: Complete connector config + deployment scripts +``` diff --git a/commands/ksqldb-query.md b/commands/ksqldb-query.md new file mode 100644 index 0000000..64c3b87 --- /dev/null +++ b/commands/ksqldb-query.md @@ -0,0 +1,179 @@ +# ksqlDB Query Generator + +Generate ksqlDB queries for stream processing. + +## Task + +You are a ksqlDB expert. Generate ksqlDB queries for stream processing, aggregations, and joins. + +### Steps: + +1. **Ask for Requirements**: + - Query type: Stream, Table, Join, Aggregation + - Source topics/streams + - Output requirements + +2. **Generate ksqlDB Statements**: + +#### Create Stream from Topic: +```sql +CREATE STREAM users_stream ( + id VARCHAR KEY, + email VARCHAR, + name VARCHAR, + created_at BIGINT +) WITH ( + KAFKA_TOPIC='users', + VALUE_FORMAT='JSON', + TIMESTAMP='created_at' +); +``` + +#### Create Table (Materialized View): +```sql +CREATE TABLE user_counts AS + SELECT + region, + COUNT(*) AS user_count, + COLLECT_LIST(name) AS user_names + FROM users_stream + GROUP BY region + EMIT CHANGES; +``` + +#### Stream-Stream Join: +```sql +CREATE STREAM orders_enriched AS + SELECT + o.order_id, + o.product_id, + o.quantity, + o.price, + u.name AS customer_name, + u.email AS customer_email, + o.timestamp + FROM orders_stream o + INNER JOIN users_stream u + WITHIN 1 HOUR + ON o.user_id = u.id + EMIT CHANGES; +``` + +#### Windowed Aggregation: +```sql +-- Tumbling Window (5-minute windows) +CREATE TABLE sales_by_category_5min AS + SELECT + category, + WINDOWSTART AS window_start, + WINDOWEND AS window_end, + COUNT(*) AS order_count, + SUM(amount) AS total_sales, + AVG(amount) AS avg_sale, + MAX(amount) AS max_sale + FROM orders_stream + WINDOW TUMBLING (SIZE 5 MINUTES) + GROUP BY category + EMIT CHANGES; + +-- Hopping Window (5-min window, 1-min advance) +CREATE TABLE sales_hopping AS + SELECT + category, + WINDOWSTART AS window_start, + COUNT(*) AS order_count + FROM orders_stream + WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE) + GROUP BY category + EMIT CHANGES; + +-- Session Window (inactivity gap = 30 minutes) +CREATE TABLE user_sessions AS + SELECT + user_id, + WINDOWSTART AS session_start, + WINDOWEND AS session_end, + COUNT(*) AS event_count + FROM user_events_stream + WINDOW SESSION (30 MINUTES) + GROUP BY user_id + EMIT CHANGES; +``` + +#### Filtering and Transformation: +```sql +CREATE STREAM high_value_orders AS + SELECT + order_id, + user_id, + amount, + UCASE(status) AS status, + CASE + WHEN amount > 1000 THEN 'PREMIUM' + WHEN amount > 500 THEN 'STANDARD' + ELSE 'BASIC' + END AS tier + FROM orders_stream + WHERE amount > 100 + EMIT CHANGES; +``` + +#### Array and Map Operations: +```sql +CREATE STREAM processed_events AS + SELECT + id, + ARRAY_CONTAINS(tags, 'premium') AS is_premium, + ARRAY_LENGTH(items) AS item_count, + MAP_KEYS(metadata) AS meta_keys, + metadata['source'] AS source + FROM events_stream + EMIT CHANGES; +``` + +3. **Generate Queries**: + +```sql +-- Push query (continuous) +SELECT * FROM users_stream +WHERE region = 'US' +EMIT CHANGES; + +-- Pull query (one-time, requires table) +SELECT * FROM user_counts +WHERE region = 'US'; +``` + +4. **Generate Monitoring Commands**: + +```sql +-- Show streams +SHOW STREAMS; + +-- Describe stream +DESCRIBE users_stream; + +-- Show queries +SHOW QUERIES; + +-- Explain query +EXPLAIN query_id; + +-- Terminate query +TERMINATE query_id; +``` + +5. **Best Practices**: +- Use appropriate window types for aggregations +- Set RETENTION for stateful operations +- Use pull queries for point-in-time lookups +- Configure partitioning for joins +- Add error handling for UDFs +- Monitor query performance + +### Example Usage: + +``` +User: "Calculate hourly sales by category" +Result: Complete ksqlDB window aggregation query +``` diff --git a/commands/schema-register.md b/commands/schema-register.md new file mode 100644 index 0000000..8c898b2 --- /dev/null +++ b/commands/schema-register.md @@ -0,0 +1,123 @@ +# Schema Registry Management + +Manage Avro/JSON/Protobuf schemas in Confluent Schema Registry. + +## Task + +You are an expert in Confluent Schema Registry. Help users register, update, and manage schemas. + +### Steps: + +1. **Ask for Required Information**: + - Schema format: Avro, JSON Schema, or Protobuf + - Subject name (topic name or custom subject) + - Schema definition + - Compatibility mode (optional) + +2. **Generate Schema Definition**: + +#### Avro Example: +```json +{ + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "email", "type": "string"}, + {"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"} + ] +} +``` + +#### JSON Schema Example: +```json +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "User", + "type": "object", + "properties": { + "id": {"type": "string"}, + "email": {"type": "string", "format": "email"}, + "createdAt": {"type": "string", "format": "date-time"} + }, + "required": ["id", "email"] +} +``` + +#### Protobuf Example: +```protobuf +syntax = "proto3"; + +package com.example; + +message User { + string id = 1; + string email = 2; + int64 created_at = 3; +} +``` + +3. **Generate Registration Script**: + +#### Using curl: +```bash +curl -X POST http://localhost:8081/subjects/users-value/versions \ + -H "Content-Type: application/vnd.schemaregistry.v1+json" \ + -d '{ + "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}]}" + }' +``` + +#### Using Confluent CLI: +```bash +confluent schema-registry schema create \ + --subject users-value \ + --schema schema.avsc \ + --type AVRO +``` + +#### Using Python: +```python +from confluent_kafka.schema_registry import SchemaRegistryClient, Schema + +sr_client = SchemaRegistryClient({'url': 'http://localhost:8081'}) + +schema_str = """ +{ + "type": "record", + "name": "User", + "fields": [...] +} +""" + +schema = Schema(schema_str, schema_type="AVRO") +schema_id = sr_client.register_schema("users-value", schema) +``` + +4. **Set Compatibility Mode**: +```bash +# BACKWARD (default) - consumers using new schema can read old data +# FORWARD - consumers using old schema can read new data +# FULL - both backward and forward +# NONE - no compatibility checks + +curl -X PUT http://localhost:8081/config/users-value \ + -H "Content-Type: application/vnd.schemaregistry.v1+json" \ + -d '{"compatibility": "BACKWARD"}' +``` + +5. **Best Practices**: +- Use semantic versioning in schema evolution +- Always test compatibility before registering +- Document breaking changes +- Use logical types (timestamp-millis, decimal) +- Add field descriptions/documentation +- Use subject naming strategy consistently + +### Example Usage: + +``` +User: "Register Avro schema for user events" +Result: Complete Avro schema + registration script +``` diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..8f8cf8f --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,69 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:anton-abyzov/specweave:plugins/specweave-confluent", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "89f8d067c18ce790e54c68607367c4fdf5c53727", + "treeHash": "7d7e78f55c663acf8c7dc28df6a2339b284ca3f60c1bb55131a1a0af3833e744", + "generatedAt": "2025-11-28T10:13:52.569792Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "specweave-confluent", + "description": "Confluent Cloud integration for SpecWeave - Schema Registry, ksqlDB, Kafka Connect, Flink, stream processing, and enterprise Kafka features", + "version": "0.22.14" + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "23f175e23830308cc79f4e11b24b254c97051aaad9f1927238a85fa70c6734be" + }, + { + "path": "agents/confluent-architect/AGENT.md", + "sha256": "77b3550ae9ce562301ce7e2f9b1d0e6c8ee05127c61f85acb59d262374b6815e" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "fac8e4a2f53627ea79f965bdcf8c7946e0beb04ffb78d767ca0e15b42f279db9" + }, + { + "path": "commands/connector-deploy.md", + "sha256": "8dac2c1f596e5d5ff236ef2ca274e1a244001659f6ef42956c6f8c25c3d35c6e" + }, + { + "path": "commands/schema-register.md", + "sha256": "9e75a3f0fcb772c13453a2ab726f8093f82b0e4e8fdc6e54d404c6d6ad5f2cb6" + }, + { + "path": "commands/ksqldb-query.md", + "sha256": "885fabe9d1f5e4a647499a5f40ca2edad98eade434f0089c3bad41795322a6cd" + }, + { + "path": "skills/confluent-kafka-connect/SKILL.md", + "sha256": "06007d9c810f7590e6c4c2c6057ccace51934c44a293670718195f7a556e97a1" + }, + { + "path": "skills/confluent-ksqldb/SKILL.md", + "sha256": "2d640f18a695ca598572032146cdfc50ae140497fc681369aca6872ce70f605f" + }, + { + "path": "skills/confluent-schema-registry/SKILL.md", + "sha256": "c742565eda3aa24672a18b80aa23b043eb6787dbcbd038e322c27af89d8ca3c7" + } + ], + "dirSha256": "7d7e78f55c663acf8c7dc28df6a2339b284ca3f60c1bb55131a1a0af3833e744" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file diff --git a/skills/confluent-kafka-connect/SKILL.md b/skills/confluent-kafka-connect/SKILL.md new file mode 100644 index 0000000..eef397e --- /dev/null +++ b/skills/confluent-kafka-connect/SKILL.md @@ -0,0 +1,453 @@ +--- +name: confluent-kafka-connect +description: Kafka Connect integration expert. Covers source and sink connectors, JDBC, Elasticsearch, S3, Debezium CDC, SMT (Single Message Transforms), connector configuration, and data pipeline patterns. Activates for kafka connect, connectors, source connector, sink connector, jdbc connector, debezium, smt, data pipeline, cdc. +--- + +# Confluent Kafka Connect Skill + +Expert knowledge of Kafka Connect for building data pipelines with source and sink connectors. + +## What I Know + +### Connector Types + +**Source Connectors** (External System → Kafka): +- JDBC Source: Databases → Kafka +- Debezium: CDC (MySQL, PostgreSQL, MongoDB) → Kafka +- S3 Source: AWS S3 files → Kafka +- File Source: Local files → Kafka + +**Sink Connectors** (Kafka → External System): +- JDBC Sink: Kafka → Databases +- Elasticsearch Sink: Kafka → Elasticsearch +- S3 Sink: Kafka → AWS S3 +- HDFS Sink: Kafka → Hadoop HDFS + +**Single Message Transforms (SMTs)**: +- Field operations: Insert, Mask, Replace, TimestampConverter +- Routing: RegexRouter, TimestampRouter +- Filtering: Filter, Predicates + +## When to Use This Skill + +Activate me when you need help with: +- Connector setup ("Configure JDBC connector") +- CDC patterns ("Debezium MySQL CDC") +- Data pipelines ("Stream database changes to Kafka") +- SMT transforms ("Mask sensitive fields") +- Connector troubleshooting ("Connector task failed") + +## Common Patterns + +### Pattern 1: JDBC Source (Database → Kafka) + +**Use Case**: Stream database table changes to Kafka + +**Configuration**: +```json +{ + "name": "jdbc-source-users", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "tasks.max": "1", + "connection.url": "jdbc:postgresql://localhost:5432/mydb", + "connection.user": "postgres", + "connection.password": "password", + "mode": "incrementing", + "incrementing.column.name": "id", + "topic.prefix": "postgres-", + "table.whitelist": "users,orders", + "poll.interval.ms": "5000" + } +} +``` + +**Modes**: +- `incrementing`: Track by auto-increment ID +- `timestamp`: Track by timestamp column +- `timestamp+incrementing`: Both (most reliable) + +### Pattern 2: Debezium CDC (MySQL → Kafka) + +**Use Case**: Capture all database changes (INSERT/UPDATE/DELETE) + +**Configuration**: +```json +{ + "name": "debezium-mysql-cdc", + "config": { + "connector.class": "io.debezium.connector.mysql.MySqlConnector", + "tasks.max": "1", + "database.hostname": "localhost", + "database.port": "3306", + "database.user": "debezium", + "database.password": "password", + "database.server.id": "1", + "database.server.name": "mysql", + "database.include.list": "mydb", + "table.include.list": "mydb.users,mydb.orders", + "schema.history.internal.kafka.bootstrap.servers": "localhost:9092", + "schema.history.internal.kafka.topic": "schema-changes.mydb" + } +} +``` + +**Output Format** (Debezium Envelope): +```json +{ + "before": null, + "after": { + "id": 1, + "name": "John Doe", + "email": "john@example.com" + }, + "source": { + "version": "1.9.0", + "connector": "mysql", + "name": "mysql", + "ts_ms": 1620000000000, + "snapshot": "false", + "db": "mydb", + "table": "users", + "server_id": 1, + "gtid": null, + "file": "mysql-bin.000001", + "pos": 12345, + "row": 0, + "thread": null, + "query": null + }, + "op": "c", // c=CREATE, u=UPDATE, d=DELETE, r=READ + "ts_ms": 1620000000000 +} +``` + +### Pattern 3: JDBC Sink (Kafka → Database) + +**Use Case**: Write Kafka events to PostgreSQL + +**Configuration**: +```json +{ + "name": "jdbc-sink-enriched-orders", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", + "tasks.max": "3", + "topics": "enriched-orders", + "connection.url": "jdbc:postgresql://localhost:5432/analytics", + "connection.user": "postgres", + "connection.password": "password", + "auto.create": "true", + "auto.evolve": "true", + "insert.mode": "upsert", + "pk.mode": "record_value", + "pk.fields": "order_id", + "table.name.format": "orders_${topic}" + } +} +``` + +**Insert Modes**: +- `insert`: Append only (fails on duplicate) +- `update`: Update only (requires PK) +- `upsert`: INSERT or UPDATE (recommended) + +### Pattern 4: S3 Sink (Kafka → AWS S3) + +**Use Case**: Archive Kafka topics to S3 + +**Configuration**: +```json +{ + "name": "s3-sink-events", + "config": { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "tasks.max": "3", + "topics": "user-events,order-events", + "s3.region": "us-east-1", + "s3.bucket.name": "my-kafka-archive", + "s3.part.size": "5242880", + "flush.size": "1000", + "rotate.interval.ms": "60000", + "rotate.schedule.interval.ms": "3600000", + "timezone": "UTC", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", + "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH", + "locale": "US", + "timestamp.extractor": "Record" + } +} +``` + +**Partitioning** (S3 folder structure): +``` +s3://my-kafka-archive/ + topics/user-events/year=2025/month=01/day=15/hour=10/ + user-events+0+0000000000.json + user-events+0+0000001000.json + topics/order-events/year=2025/month=01/day=15/hour=10/ + order-events+0+0000000000.json +``` + +### Pattern 5: Elasticsearch Sink (Kafka → Elasticsearch) + +**Use Case**: Index Kafka events for search + +**Configuration**: +```json +{ + "name": "elasticsearch-sink-logs", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "3", + "topics": "application-logs", + "connection.url": "http://localhost:9200", + "connection.username": "elastic", + "connection.password": "password", + "key.ignore": "true", + "schema.ignore": "true", + "type.name": "_doc", + "index.write.wait_for_active_shards": "1" + } +} +``` + +## Single Message Transforms (SMTs) + +### Transform 1: Mask Sensitive Fields + +**Use Case**: Hide email/phone in Kafka topics + +**Configuration**: +```json +{ + "transforms": "maskEmail", + "transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value", + "transforms.maskEmail.fields": "email,phone" +} +``` + +**Before**: +```json +{"id": 1, "name": "John", "email": "john@example.com", "phone": "555-1234"} +``` + +**After**: +```json +{"id": 1, "name": "John", "email": null, "phone": null} +``` + +### Transform 2: Add Timestamp + +**Use Case**: Add processing timestamp to all messages + +**Configuration**: +```json +{ + "transforms": "insertTimestamp", + "transforms.insertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", + "transforms.insertTimestamp.timestamp.field": "processed_at" +} +``` + +### Transform 3: Route by Field Value + +**Use Case**: Route high-value orders to separate topic + +**Configuration**: +```json +{ + "transforms": "routeByValue", + "transforms.routeByValue.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.routeByValue.regex": "(.*)", + "transforms.routeByValue.replacement": "$1-high-value", + "transforms.routeByValue.predicate": "isHighValue", + "predicates": "isHighValue", + "predicates.isHighValue.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", + "predicates.isHighValue.pattern": "orders" +} +``` + +### Transform 4: Flatten Nested JSON + +**Use Case**: Flatten nested structures for JDBC sink + +**Configuration**: +```json +{ + "transforms": "flatten", + "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", + "transforms.flatten.delimiter": "_" +} +``` + +**Before**: +```json +{ + "user": { + "id": 1, + "profile": { + "name": "John", + "email": "john@example.com" + } + } +} +``` + +**After**: +```json +{ + "user_id": 1, + "user_profile_name": "John", + "user_profile_email": "john@example.com" +} +``` + +## Best Practices + +### 1. Use Idempotent Connectors + +✅ **DO**: +```json +// JDBC Sink with upsert mode +{ + "insert.mode": "upsert", + "pk.mode": "record_value", + "pk.fields": "id" +} +``` + +❌ **DON'T**: +```json +// WRONG: insert mode (duplicates on restart!) +{ + "insert.mode": "insert" +} +``` + +### 2. Monitor Connector Status + +```bash +# Check connector status +curl http://localhost:8083/connectors/jdbc-source-users/status + +# Check task status +curl http://localhost:8083/connectors/jdbc-source-users/tasks/0/status +``` + +### 3. Use Schema Registry + +✅ **DO**: +```json +{ + "value.converter": "io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url": "http://localhost:8081" +} +``` + +### 4. Configure Error Handling + +```json +{ + "errors.tolerance": "all", + "errors.log.enable": "true", + "errors.log.include.messages": "true", + "errors.deadletterqueue.topic.name": "dlq-jdbc-sink", + "errors.deadletterqueue.context.headers.enable": "true" +} +``` + +## Connector Management + +### Deploy Connector + +```bash +# Create connector via REST API +curl -X POST http://localhost:8083/connectors \ + -H "Content-Type: application/json" \ + -d @jdbc-source.json + +# Update connector +curl -X PUT http://localhost:8083/connectors/jdbc-source-users/config \ + -H "Content-Type: application/json" \ + -d @jdbc-source.json +``` + +### Monitor Connectors + +```bash +# List all connectors +curl http://localhost:8083/connectors + +# Get connector info +curl http://localhost:8083/connectors/jdbc-source-users + +# Get connector status +curl http://localhost:8083/connectors/jdbc-source-users/status + +# Get connector tasks +curl http://localhost:8083/connectors/jdbc-source-users/tasks +``` + +### Pause/Resume Connectors + +```bash +# Pause connector +curl -X PUT http://localhost:8083/connectors/jdbc-source-users/pause + +# Resume connector +curl -X PUT http://localhost:8083/connectors/jdbc-source-users/resume + +# Restart connector +curl -X POST http://localhost:8083/connectors/jdbc-source-users/restart + +# Restart task +curl -X POST http://localhost:8083/connectors/jdbc-source-users/tasks/0/restart +``` + +## Common Issues & Solutions + +### Issue 1: Connector Task Failed + +**Symptoms**: Task state = FAILED + +**Solutions**: +1. Check connector logs: `docker logs connect-worker` +2. Validate configuration: `curl http://localhost:8083/connector-plugins//config/validate` +3. Restart task: `curl -X POST .../tasks/0/restart` + +### Issue 2: Schema Evolution Error + +**Error**: `Incompatible schema detected` + +**Solution**: Enable auto-evolution: +```json +{ + "auto.create": "true", + "auto.evolve": "true" +} +``` + +### Issue 3: JDBC Connection Pool Exhausted + +**Error**: `Could not get JDBC connection` + +**Solution**: Increase pool size: +```json +{ + "connection.attempts": "3", + "connection.backoff.ms": "10000" +} +``` + +## References + +- Kafka Connect Documentation: https://kafka.apache.org/documentation/#connect +- Confluent Hub: https://www.confluent.io/hub/ +- Debezium Documentation: https://debezium.io/documentation/ +- Transform Reference: https://kafka.apache.org/documentation/#connect_transforms + +--- + +**Invoke me when you need Kafka Connect, connectors, CDC, or data pipeline expertise!** diff --git a/skills/confluent-ksqldb/SKILL.md b/skills/confluent-ksqldb/SKILL.md new file mode 100644 index 0000000..a0db6c6 --- /dev/null +++ b/skills/confluent-ksqldb/SKILL.md @@ -0,0 +1,470 @@ +--- +name: confluent-ksqldb +description: ksqlDB stream processing expert. Covers SQL-like queries on Kafka topics, stream and table concepts, joins, aggregations, windowing, materialized views, and real-time data transformations. Activates for ksqldb, ksql, stream processing, kafka sql, real-time analytics, windowing, stream joins, table joins, materialized views. +--- + +# Confluent ksqlDB Skill + +Expert knowledge of ksqlDB - Confluent's event streaming database for building real-time applications with SQL-like queries on Kafka topics. + +## What I Know + +### Core Concepts + +**Streams** (Unbounded, Append-Only): +- Represents immutable event sequences +- Every row is a new event +- Cannot be updated or deleted +- Example: Click events, sensor readings, transactions + +**Tables** (Mutable, Latest State): +- Represents current state +- Updates override previous values (by key) +- Compacted topic under the hood +- Example: User profiles, product inventory, account balances + +**Key Difference**: +```sql +-- STREAM: Every event is independent +INSERT INTO clicks_stream (user_id, page, timestamp) +VALUES (1, 'homepage', CURRENT_TIMESTAMP()); +-- Creates NEW row + +-- TABLE: Latest value wins (by key) +INSERT INTO users_table (user_id, name, email) +VALUES (1, 'John', 'john@example.com'); +-- UPDATES existing row with user_id=1 +``` + +### Query Types + +**1. Streaming Queries** (Continuous, Real-Time): +```sql +-- Filter events in real-time +SELECT user_id, page, timestamp +FROM clicks_stream +WHERE page = 'checkout' +EMIT CHANGES; + +-- Transform on the fly +SELECT + user_id, + UPPER(page) AS page_upper, + TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd') AS date +FROM clicks_stream +EMIT CHANGES; +``` + +**2. Materialized Views** (Pre-Computed Tables): +```sql +-- Aggregate clicks per user (updates continuously) +CREATE TABLE user_click_counts AS +SELECT + user_id, + COUNT(*) AS click_count +FROM clicks_stream +GROUP BY user_id +EMIT CHANGES; + +-- Query the table (instant results!) +SELECT * FROM user_click_counts WHERE user_id = 123; +``` + +**3. Pull Queries** (Point-in-Time Reads): +```sql +-- Query current state (like traditional SQL) +SELECT * FROM users_table WHERE user_id = 123; + +-- No EMIT CHANGES = pull query (returns once) +``` + +## When to Use This Skill + +Activate me when you need help with: +- ksqlDB syntax ("How to create ksqlDB stream?") +- Stream vs table concepts ("When to use stream vs table?") +- Joins ("Join stream with table") +- Aggregations ("Count events per user") +- Windowing ("Tumbling window aggregation") +- Real-time transformations ("Filter and enrich events") +- Materialized views ("Create pre-computed aggregates") + +## Common Patterns + +### Pattern 1: Filter Events + +**Use Case**: Drop irrelevant events early + +```sql +-- Create filtered stream +CREATE STREAM important_clicks AS +SELECT * +FROM clicks_stream +WHERE page IN ('checkout', 'payment', 'confirmation') +EMIT CHANGES; +``` + +### Pattern 2: Enrich Events (Stream-Table Join) + +**Use Case**: Add user details to click events + +```sql +-- Users table (current state) +CREATE TABLE users ( + user_id BIGINT PRIMARY KEY, + name VARCHAR, + email VARCHAR +) WITH ( + kafka_topic='users', + value_format='AVRO' +); + +-- Enrich clicks with user data +CREATE STREAM enriched_clicks AS +SELECT + c.user_id, + c.page, + c.timestamp, + u.name, + u.email +FROM clicks_stream c +LEFT JOIN users u ON c.user_id = u.user_id +EMIT CHANGES; +``` + +### Pattern 3: Real-Time Aggregation + +**Use Case**: Count events per user, per 5-minute window + +```sql +CREATE TABLE user_clicks_per_5min AS +SELECT + user_id, + WINDOWSTART AS window_start, + WINDOWEND AS window_end, + COUNT(*) AS click_count +FROM clicks_stream +WINDOW TUMBLING (SIZE 5 MINUTES) +GROUP BY user_id +EMIT CHANGES; + +-- Query current window +SELECT * FROM user_clicks_per_5min +WHERE user_id = 123 +AND window_start >= NOW() - INTERVAL 5 MINUTES; +``` + +### Pattern 4: Detect Anomalies + +**Use Case**: Alert when user clicks >100 times in 1 minute + +```sql +CREATE STREAM high_click_alerts AS +SELECT + user_id, + COUNT(*) AS click_count +FROM clicks_stream +WINDOW TUMBLING (SIZE 1 MINUTE) +GROUP BY user_id +HAVING COUNT(*) > 100 +EMIT CHANGES; +``` + +### Pattern 5: Change Data Capture (CDC) + +**Use Case**: Track changes to user table + +```sql +-- Create table from CDC topic (Debezium) +CREATE TABLE users_cdc ( + user_id BIGINT PRIMARY KEY, + name VARCHAR, + email VARCHAR, + op VARCHAR -- INSERT, UPDATE, DELETE +) WITH ( + kafka_topic='mysql.users.cdc', + value_format='AVRO' +); + +-- Stream of changes only +CREATE STREAM user_changes AS +SELECT * FROM users_cdc +WHERE op IN ('UPDATE', 'DELETE') +EMIT CHANGES; +``` + +## Join Types + +### 1. Stream-Stream Join + +**Use Case**: Correlate related events within time window + +```sql +-- Join page views with clicks within 10 minutes +CREATE STREAM page_view_with_clicks AS +SELECT + v.user_id, + v.page AS viewed_page, + c.page AS clicked_page +FROM page_views v +INNER JOIN clicks c WITHIN 10 MINUTES +ON v.user_id = c.user_id +EMIT CHANGES; +``` + +**Window Types**: +- `WITHIN 10 MINUTES` - Events must be within 10 minutes of each other +- `GRACE PERIOD 5 MINUTES` - Late-arriving events accepted for 5 more minutes + +### 2. Stream-Table Join + +**Use Case**: Enrich events with current state + +```sql +-- Add product details to order events +CREATE STREAM enriched_orders AS +SELECT + o.order_id, + o.product_id, + p.product_name, + p.price +FROM orders_stream o +LEFT JOIN products_table p ON o.product_id = p.product_id +EMIT CHANGES; +``` + +### 3. Table-Table Join + +**Use Case**: Combine two tables (latest state) + +```sql +-- Join users with their current cart +CREATE TABLE user_with_cart AS +SELECT + u.user_id, + u.name, + c.cart_total +FROM users u +LEFT JOIN shopping_carts c ON u.user_id = c.user_id +EMIT CHANGES; +``` + +## Windowing Types + +### Tumbling Window (Non-Overlapping) + +**Use Case**: Aggregate per fixed time period + +```sql +-- Count events every 5 minutes +SELECT + user_id, + COUNT(*) AS event_count +FROM events +WINDOW TUMBLING (SIZE 5 MINUTES) +GROUP BY user_id; + +-- Windows: [0:00-0:05), [0:05-0:10), [0:10-0:15) +``` + +### Hopping Window (Overlapping) + +**Use Case**: Moving average over time + +```sql +-- Count events in 10-minute windows, advancing every 5 minutes +SELECT + user_id, + COUNT(*) AS event_count +FROM events +WINDOW HOPPING (SIZE 10 MINUTES, ADVANCE BY 5 MINUTES) +GROUP BY user_id; + +-- Windows: [0:00-0:10), [0:05-0:15), [0:10-0:20) +``` + +### Session Window (Event-Based) + +**Use Case**: Group events by user session (gap-based) + +```sql +-- Session ends after 30 minutes of inactivity +SELECT + user_id, + COUNT(*) AS session_events +FROM events +WINDOW SESSION (30 MINUTES) +GROUP BY user_id; +``` + +## Best Practices + +### 1. Use Appropriate Data Types + +✅ **DO**: +```sql +CREATE STREAM orders ( + order_id BIGINT, + user_id BIGINT, + total DECIMAL(10, 2), -- Precise currency + timestamp TIMESTAMP +); +``` + +❌ **DON'T**: +```sql +-- WRONG: Using DOUBLE for currency (precision loss!) +total DOUBLE +``` + +### 2. Always Specify Keys + +✅ **DO**: +```sql +CREATE TABLE users ( + user_id BIGINT PRIMARY KEY, -- Explicit key + name VARCHAR +) WITH (kafka_topic='users'); +``` + +❌ **DON'T**: +```sql +-- WRONG: No key specified (can't join!) +CREATE TABLE users ( + user_id BIGINT, + name VARCHAR +); +``` + +### 3. Use Windowing for Aggregations + +✅ **DO**: +```sql +-- Windowed aggregation (bounded memory) +SELECT COUNT(*) FROM events +WINDOW TUMBLING (SIZE 1 HOUR) +GROUP BY user_id; +``` + +❌ **DON'T**: +```sql +-- WRONG: Non-windowed aggregation (unbounded memory!) +SELECT COUNT(*) FROM events GROUP BY user_id; +``` + +### 4. Set Retention Policies + +```sql +-- Limit table size (keep last 7 days) +CREATE TABLE user_stats ( + user_id BIGINT PRIMARY KEY, + click_count BIGINT +) WITH ( + kafka_topic='user_stats', + retention_ms=604800000 -- 7 days +); +``` + +## Performance Optimization + +### 1. Partition Alignment + +**Ensure joined streams/tables have same partition key**: + +```sql +-- GOOD: Both keyed by user_id (co-partitioned) +CREATE STREAM clicks (user_id BIGINT KEY, ...) +CREATE TABLE users (user_id BIGINT PRIMARY KEY, ...) + +-- Join works efficiently (no repartitioning) +SELECT * FROM clicks c +JOIN users u ON c.user_id = u.user_id; +``` + +### 2. Use Materialized Views + +**Pre-compute expensive queries**: + +```sql +-- BAD: Compute on every request +SELECT COUNT(*) FROM orders WHERE user_id = 123; + +-- GOOD: Materialized table (instant lookup) +CREATE TABLE user_order_counts AS +SELECT user_id, COUNT(*) AS order_count +FROM orders GROUP BY user_id; + +-- Query is now instant +SELECT order_count FROM user_order_counts WHERE user_id = 123; +``` + +### 3. Filter Early + +```sql +-- GOOD: Filter before join +CREATE STREAM important_events AS +SELECT * FROM events WHERE event_type = 'purchase'; + +SELECT * FROM important_events e +JOIN users u ON e.user_id = u.user_id; + +-- BAD: Join first, filter later (processes all events!) +SELECT * FROM events e +JOIN users u ON e.user_id = u.user_id +WHERE e.event_type = 'purchase'; +``` + +## Common Issues & Solutions + +### Issue 1: Query Timing Out + +**Error**: Query timed out + +**Root Cause**: Non-windowed aggregation on large stream + +**Solution**: Add time window: +```sql +-- WRONG +SELECT COUNT(*) FROM events GROUP BY user_id; + +-- RIGHT +SELECT COUNT(*) FROM events +WINDOW TUMBLING (SIZE 1 HOUR) +GROUP BY user_id; +``` + +### Issue 2: Partition Mismatch + +**Error**: Cannot join streams (different partition keys) + +**Solution**: Repartition stream: +```sql +-- Repartition stream by user_id +CREATE STREAM clicks_by_user AS +SELECT * FROM clicks PARTITION BY user_id; + +-- Now join works +SELECT * FROM clicks_by_user c +JOIN users u ON c.user_id = u.user_id; +``` + +### Issue 3: Late-Arriving Events + +**Solution**: Use grace period: +```sql +SELECT COUNT(*) FROM events +WINDOW TUMBLING (SIZE 5 MINUTES, GRACE PERIOD 1 MINUTE) +GROUP BY user_id; +-- Accepts events up to 1 minute late +``` + +## References + +- ksqlDB Documentation: https://docs.ksqldb.io/ +- ksqlDB Tutorials: https://kafka-tutorials.confluent.io/ +- Windowing Guide: https://docs.ksqldb.io/en/latest/concepts/time-and-windows-in-ksqldb-queries/ +- Join Types: https://docs.ksqldb.io/en/latest/developer-guide/joins/ + +--- + +**Invoke me when you need stream processing, real-time analytics, or SQL-like queries on Kafka!** diff --git a/skills/confluent-schema-registry/SKILL.md b/skills/confluent-schema-registry/SKILL.md new file mode 100644 index 0000000..257fb62 --- /dev/null +++ b/skills/confluent-schema-registry/SKILL.md @@ -0,0 +1,316 @@ +--- +name: confluent-schema-registry +description: Schema Registry expert for Avro, Protobuf, and JSON Schema management. Covers schema evolution strategies, compatibility modes, validation, and best practices for managing schemas in Confluent Cloud and self-hosted Schema Registry. Activates for schema registry, avro, protobuf, json schema, schema evolution, compatibility modes, schema validation. +--- + +# Confluent Schema Registry Skill + +Expert knowledge of Confluent Schema Registry for managing Avro, Protobuf, and JSON Schema schemas in Kafka ecosystems. + +## What I Know + +### Schema Formats + +**Avro** (Most Popular): +- Binary serialization format +- Schema evolution support +- Smaller message size vs JSON +- Self-describing with schema ID in header +- Best for: High-throughput applications, data warehousing + +**Protobuf** (Google Protocol Buffers): +- Binary serialization +- Strong typing with .proto files +- Language-agnostic (Java, Python, Go, C++, etc.) +- Efficient encoding +- Best for: Polyglot environments, gRPC integration + +**JSON Schema**: +- Human-readable text format +- Easy debugging +- Widely supported +- Larger message size +- Best for: Development, debugging, REST APIs + +### Compatibility Modes + +| Mode | Producer Can | Consumer Can | Use Case | +|------|-------------|-------------|----------| +| **BACKWARD** | Remove fields, add optional fields | Read old data with new schema | Most common, safe for consumers | +| **FORWARD** | Add fields, remove optional fields | Read new data with old schema | Safe for producers | +| **FULL** | Add/remove optional fields only | Bi-directional compatibility | Both producers and consumers upgrade independently | +| **NONE** | Any change | Must coordinate upgrades | Development only, NOT production | +| **BACKWARD_TRANSITIVE** | BACKWARD across all versions | Read any old data | Strictest backward compatibility | +| **FORWARD_TRANSITIVE** | FORWARD across all versions | Read any new data | Strictest forward compatibility | +| **FULL_TRANSITIVE** | FULL across all versions | Complete bi-directional | Strictest overall | + +**Default**: `BACKWARD` (recommended for production) + +### Schema Evolution Strategies + +**Adding Fields**: +```avro +// V1 +{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "long"}, + {"name": "name", "type": "string"} + ] +} + +// V2 - BACKWARD compatible (added optional field with default) +{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "long"}, + {"name": "name", "type": "string"}, + {"name": "email", "type": ["null", "string"], "default": null} + ] +} +``` + +**Removing Fields** (BACKWARD compatible): +```avro +// V1 +{"name": "address", "type": "string"} + +// V2 - Remove field (old consumers will ignore it) +// Field removed from schema +``` + +**Changing Field Types** (Breaking Change!): +```avro +// ❌ BREAKING - Cannot change string to int +{"name": "age", "type": "string"} → {"name": "age", "type": "int"} + +// ✅ SAFE - Use union types +{"name": "age", "type": ["string", "int"], "default": "unknown"} +``` + +## When to Use This Skill + +Activate me when you need help with: +- Schema evolution strategies ("How do I evolve my Avro schema?") +- Compatibility mode selection ("Which compatibility mode for production?") +- Schema validation ("Validate my Avro schema") +- Best practices ("Schema Registry best practices") +- Schema registration ("Register Avro schema with Schema Registry") +- Debugging schema issues ("Schema compatibility error") +- Format comparison ("Avro vs Protobuf vs JSON Schema") + +## Best Practices + +### 1. Always Use Compatible Evolution + +✅ **DO**: +- Add optional fields with defaults +- Remove optional fields +- Use union types for flexibility +- Test schema changes in staging first + +❌ **DON'T**: +- Change field types +- Remove required fields +- Rename fields (add new + deprecate old) +- Use `NONE` compatibility in production + +### 2. Schema Naming Conventions + +**Hierarchical Namespaces**: +``` +com.company.domain.EntityName +com.acme.ecommerce.Order +com.acme.ecommerce.OrderLineItem +``` + +**Subject Naming** (Kafka topics): +- `-value` - For record values +- `-key` - For record keys +- Example: `orders-value`, `orders-key` + +### 3. Schema Registry Configuration + +**Producer** (with Avro): +```javascript +const { Kafka } = require('kafkajs'); +const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry'); + +const registry = new SchemaRegistry({ + host: 'https://schema-registry:8081', + auth: { + username: 'SR_API_KEY', + password: 'SR_API_SECRET' + } +}); + +// Register schema +const schema = ` +{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "long"}, + {"name": "name", "type": "string"} + ] +} +`; + +const { id } = await registry.register({ + type: SchemaType.AVRO, + schema +}); + +// Encode message with schema +const payload = await registry.encode(id, { + id: 1, + name: 'John Doe' +}); + +await producer.send({ + topic: 'users', + messages: [{ value: payload }] +}); +``` + +**Consumer** (with Avro): +```javascript +const consumer = kafka.consumer({ groupId: 'user-processor' }); + +await consumer.subscribe({ topic: 'users' }); + +await consumer.run({ + eachMessage: async ({ message }) => { + // Decode message (schema ID is in header) + const decodedMessage = await registry.decode(message.value); + console.log(decodedMessage); // { id: 1, name: 'John Doe' } + } +}); +``` + +### 4. Schema Validation Workflow + +**Before Registering**: +1. Validate schema syntax (Avro JSON, .proto, JSON Schema) +2. Check compatibility with existing versions +3. Test with sample data +4. Register in dev/staging first +5. Deploy to production after validation + +**CLI Validation**: +```bash +# Check compatibility (before registering) +curl -X POST http://localhost:8081/compatibility/subjects/users-value/versions/latest \ + -H "Content-Type: application/vnd.schemaregistry.v1+json" \ + -d '{"schema": "{...}"}' + +# Register schema +curl -X POST http://localhost:8081/subjects/users-value/versions \ + -H "Content-Type: application/vnd.schemaregistry.v1+json" \ + -d '{"schema": "{...}"}' +``` + +## Common Issues & Solutions + +### Issue 1: Schema Compatibility Error + +**Error**: +``` +Schema being registered is incompatible with an earlier schema +``` + +**Root Cause**: Violates compatibility mode (e.g., removed required field with BACKWARD mode) + +**Solution**: +1. Check current compatibility mode: + ```bash + curl http://localhost:8081/config/users-value + ``` +2. Fix schema to be compatible OR change mode (carefully!) +3. Validate before registering: + ```bash + curl -X POST http://localhost:8081/compatibility/subjects/users-value/versions/latest \ + -d '{"schema": "{...}"}' + ``` + +### Issue 2: Schema Not Found + +**Error**: +``` +Subject 'users-value' not found +``` + +**Root Cause**: Schema not registered yet OR wrong subject name + +**Solution**: +1. List all subjects: + ```bash + curl http://localhost:8081/subjects + ``` +2. Register schema if missing +3. Check subject naming convention (`-key` or `-value`) + +### Issue 3: Message Deserialization Failed + +**Error**: +``` +Unknown magic byte! +``` + +**Root Cause**: Message not encoded with Schema Registry (missing magic byte + schema ID) + +**Solution**: +1. Ensure producer uses Schema Registry encoder +2. Check message format: [magic_byte(1) + schema_id(4) + payload] +3. Use `@kafkajs/confluent-schema-registry` library + +## Schema Evolution Decision Tree + +``` +Need to change schema? +├─ Adding new field? +│ ├─ Required field? → Add with default value (BACKWARD) +│ └─ Optional field? → Add with default null (BACKWARD) +│ +├─ Removing field? +│ ├─ Required field? → ❌ BREAKING CHANGE (coordinate upgrade) +│ └─ Optional field? → ✅ BACKWARD compatible +│ +├─ Changing field type? +│ ├─ Compatible types (e.g., int → long)? → Use union types +│ └─ Incompatible types? → ❌ BREAKING CHANGE (add new field, deprecate old) +│ +└─ Renaming field? + └─ ❌ BREAKING CHANGE → Add new field + mark old as deprecated +``` + +## Avro vs Protobuf vs JSON Schema Comparison + +| Feature | Avro | Protobuf | JSON Schema | +|---------|------|----------|-------------| +| **Encoding** | Binary | Binary | Text (JSON) | +| **Message Size** | Small (90% smaller) | Small (80% smaller) | Large (baseline) | +| **Human Readable** | No | No | Yes | +| **Schema Evolution** | Excellent | Good | Fair | +| **Language Support** | Java, Python, C++ | 20+ languages | Universal | +| **Performance** | Very Fast | Very Fast | Slower | +| **Debugging** | Harder | Harder | Easy | +| **Best For** | Data warehousing, ETL | Polyglot, gRPC | REST APIs, dev | + +**Recommendation**: +- **Production**: Avro (best balance) +- **Polyglot teams**: Protobuf +- **Development/Debugging**: JSON Schema + +## References + +- Schema Registry REST API: https://docs.confluent.io/platform/current/schema-registry/develop/api.html +- Avro Specification: https://avro.apache.org/docs/current/spec.html +- Protobuf Guide: https://developers.google.com/protocol-buffers +- JSON Schema Spec: https://json-schema.org/ + +--- + +**Invoke me when you need schema management, evolution strategies, or compatibility guidance!**