Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 17:56:21 +08:00
commit 1d1e040e9d
10 changed files with 2114 additions and 0 deletions

View File

@@ -0,0 +1,453 @@
---
name: confluent-kafka-connect
description: Kafka Connect integration expert. Covers source and sink connectors, JDBC, Elasticsearch, S3, Debezium CDC, SMT (Single Message Transforms), connector configuration, and data pipeline patterns. Activates for kafka connect, connectors, source connector, sink connector, jdbc connector, debezium, smt, data pipeline, cdc.
---
# Confluent Kafka Connect Skill
Expert knowledge of Kafka Connect for building data pipelines with source and sink connectors.
## What I Know
### Connector Types
**Source Connectors** (External System → Kafka):
- JDBC Source: Databases → Kafka
- Debezium: CDC (MySQL, PostgreSQL, MongoDB) → Kafka
- S3 Source: AWS S3 files → Kafka
- File Source: Local files → Kafka
**Sink Connectors** (Kafka → External System):
- JDBC Sink: Kafka → Databases
- Elasticsearch Sink: Kafka → Elasticsearch
- S3 Sink: Kafka → AWS S3
- HDFS Sink: Kafka → Hadoop HDFS
**Single Message Transforms (SMTs)**:
- Field operations: Insert, Mask, Replace, TimestampConverter
- Routing: RegexRouter, TimestampRouter
- Filtering: Filter, Predicates
## When to Use This Skill
Activate me when you need help with:
- Connector setup ("Configure JDBC connector")
- CDC patterns ("Debezium MySQL CDC")
- Data pipelines ("Stream database changes to Kafka")
- SMT transforms ("Mask sensitive fields")
- Connector troubleshooting ("Connector task failed")
## Common Patterns
### Pattern 1: JDBC Source (Database → Kafka)
**Use Case**: Stream database table changes to Kafka
**Configuration**:
```json
{
"name": "jdbc-source-users",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "postgres",
"connection.password": "password",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres-",
"table.whitelist": "users,orders",
"poll.interval.ms": "5000"
}
}
```
**Modes**:
- `incrementing`: Track by auto-increment ID
- `timestamp`: Track by timestamp column
- `timestamp+incrementing`: Both (most reliable)
### Pattern 2: Debezium CDC (MySQL → Kafka)
**Use Case**: Capture all database changes (INSERT/UPDATE/DELETE)
**Configuration**:
```json
{
"name": "debezium-mysql-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1",
"database.server.name": "mysql",
"database.include.list": "mydb",
"table.include.list": "mydb.users,mydb.orders",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.mydb"
}
}
```
**Output Format** (Debezium Envelope):
```json
{
"before": null,
"after": {
"id": 1,
"name": "John Doe",
"email": "john@example.com"
},
"source": {
"version": "1.9.0",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1620000000000,
"snapshot": "false",
"db": "mydb",
"table": "users",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 12345,
"row": 0,
"thread": null,
"query": null
},
"op": "c", // c=CREATE, u=UPDATE, d=DELETE, r=READ
"ts_ms": 1620000000000
}
```
### Pattern 3: JDBC Sink (Kafka → Database)
**Use Case**: Write Kafka events to PostgreSQL
**Configuration**:
```json
{
"name": "jdbc-sink-enriched-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "3",
"topics": "enriched-orders",
"connection.url": "jdbc:postgresql://localhost:5432/analytics",
"connection.user": "postgres",
"connection.password": "password",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "order_id",
"table.name.format": "orders_${topic}"
}
}
```
**Insert Modes**:
- `insert`: Append only (fails on duplicate)
- `update`: Update only (requires PK)
- `upsert`: INSERT or UPDATE (recommended)
### Pattern 4: S3 Sink (Kafka → AWS S3)
**Use Case**: Archive Kafka topics to S3
**Configuration**:
```json
{
"name": "s3-sink-events",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "user-events,order-events",
"s3.region": "us-east-1",
"s3.bucket.name": "my-kafka-archive",
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "60000",
"rotate.schedule.interval.ms": "3600000",
"timezone": "UTC",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timestamp.extractor": "Record"
}
}
```
**Partitioning** (S3 folder structure):
```
s3://my-kafka-archive/
topics/user-events/year=2025/month=01/day=15/hour=10/
user-events+0+0000000000.json
user-events+0+0000001000.json
topics/order-events/year=2025/month=01/day=15/hour=10/
order-events+0+0000000000.json
```
### Pattern 5: Elasticsearch Sink (Kafka → Elasticsearch)
**Use Case**: Index Kafka events for search
**Configuration**:
```json
{
"name": "elasticsearch-sink-logs",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
"topics": "application-logs",
"connection.url": "http://localhost:9200",
"connection.username": "elastic",
"connection.password": "password",
"key.ignore": "true",
"schema.ignore": "true",
"type.name": "_doc",
"index.write.wait_for_active_shards": "1"
}
}
```
## Single Message Transforms (SMTs)
### Transform 1: Mask Sensitive Fields
**Use Case**: Hide email/phone in Kafka topics
**Configuration**:
```json
{
"transforms": "maskEmail",
"transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskEmail.fields": "email,phone"
}
```
**Before**:
```json
{"id": 1, "name": "John", "email": "john@example.com", "phone": "555-1234"}
```
**After**:
```json
{"id": 1, "name": "John", "email": null, "phone": null}
```
### Transform 2: Add Timestamp
**Use Case**: Add processing timestamp to all messages
**Configuration**:
```json
{
"transforms": "insertTimestamp",
"transforms.insertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTimestamp.timestamp.field": "processed_at"
}
```
### Transform 3: Route by Field Value
**Use Case**: Route high-value orders to separate topic
**Configuration**:
```json
{
"transforms": "routeByValue",
"transforms.routeByValue.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByValue.regex": "(.*)",
"transforms.routeByValue.replacement": "$1-high-value",
"transforms.routeByValue.predicate": "isHighValue",
"predicates": "isHighValue",
"predicates.isHighValue.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHighValue.pattern": "orders"
}
```
### Transform 4: Flatten Nested JSON
**Use Case**: Flatten nested structures for JDBC sink
**Configuration**:
```json
{
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
```
**Before**:
```json
{
"user": {
"id": 1,
"profile": {
"name": "John",
"email": "john@example.com"
}
}
}
```
**After**:
```json
{
"user_id": 1,
"user_profile_name": "John",
"user_profile_email": "john@example.com"
}
```
## Best Practices
### 1. Use Idempotent Connectors
**DO**:
```json
// JDBC Sink with upsert mode
{
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "id"
}
```
**DON'T**:
```json
// WRONG: insert mode (duplicates on restart!)
{
"insert.mode": "insert"
}
```
### 2. Monitor Connector Status
```bash
# Check connector status
curl http://localhost:8083/connectors/jdbc-source-users/status
# Check task status
curl http://localhost:8083/connectors/jdbc-source-users/tasks/0/status
```
### 3. Use Schema Registry
**DO**:
```json
{
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
```
### 4. Configure Error Handling
```json
{
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq-jdbc-sink",
"errors.deadletterqueue.context.headers.enable": "true"
}
```
## Connector Management
### Deploy Connector
```bash
# Create connector via REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @jdbc-source.json
# Update connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/config \
-H "Content-Type: application/json" \
-d @jdbc-source.json
```
### Monitor Connectors
```bash
# List all connectors
curl http://localhost:8083/connectors
# Get connector info
curl http://localhost:8083/connectors/jdbc-source-users
# Get connector status
curl http://localhost:8083/connectors/jdbc-source-users/status
# Get connector tasks
curl http://localhost:8083/connectors/jdbc-source-users/tasks
```
### Pause/Resume Connectors
```bash
# Pause connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/pause
# Resume connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/resume
# Restart connector
curl -X POST http://localhost:8083/connectors/jdbc-source-users/restart
# Restart task
curl -X POST http://localhost:8083/connectors/jdbc-source-users/tasks/0/restart
```
## Common Issues & Solutions
### Issue 1: Connector Task Failed
**Symptoms**: Task state = FAILED
**Solutions**:
1. Check connector logs: `docker logs connect-worker`
2. Validate configuration: `curl http://localhost:8083/connector-plugins/<class>/config/validate`
3. Restart task: `curl -X POST .../tasks/0/restart`
### Issue 2: Schema Evolution Error
**Error**: `Incompatible schema detected`
**Solution**: Enable auto-evolution:
```json
{
"auto.create": "true",
"auto.evolve": "true"
}
```
### Issue 3: JDBC Connection Pool Exhausted
**Error**: `Could not get JDBC connection`
**Solution**: Increase pool size:
```json
{
"connection.attempts": "3",
"connection.backoff.ms": "10000"
}
```
## References
- Kafka Connect Documentation: https://kafka.apache.org/documentation/#connect
- Confluent Hub: https://www.confluent.io/hub/
- Debezium Documentation: https://debezium.io/documentation/
- Transform Reference: https://kafka.apache.org/documentation/#connect_transforms
---
**Invoke me when you need Kafka Connect, connectors, CDC, or data pipeline expertise!**

View File

@@ -0,0 +1,470 @@
---
name: confluent-ksqldb
description: ksqlDB stream processing expert. Covers SQL-like queries on Kafka topics, stream and table concepts, joins, aggregations, windowing, materialized views, and real-time data transformations. Activates for ksqldb, ksql, stream processing, kafka sql, real-time analytics, windowing, stream joins, table joins, materialized views.
---
# Confluent ksqlDB Skill
Expert knowledge of ksqlDB - Confluent's event streaming database for building real-time applications with SQL-like queries on Kafka topics.
## What I Know
### Core Concepts
**Streams** (Unbounded, Append-Only):
- Represents immutable event sequences
- Every row is a new event
- Cannot be updated or deleted
- Example: Click events, sensor readings, transactions
**Tables** (Mutable, Latest State):
- Represents current state
- Updates override previous values (by key)
- Compacted topic under the hood
- Example: User profiles, product inventory, account balances
**Key Difference**:
```sql
-- STREAM: Every event is independent
INSERT INTO clicks_stream (user_id, page, timestamp)
VALUES (1, 'homepage', CURRENT_TIMESTAMP());
-- Creates NEW row
-- TABLE: Latest value wins (by key)
INSERT INTO users_table (user_id, name, email)
VALUES (1, 'John', 'john@example.com');
-- UPDATES existing row with user_id=1
```
### Query Types
**1. Streaming Queries** (Continuous, Real-Time):
```sql
-- Filter events in real-time
SELECT user_id, page, timestamp
FROM clicks_stream
WHERE page = 'checkout'
EMIT CHANGES;
-- Transform on the fly
SELECT
user_id,
UPPER(page) AS page_upper,
TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd') AS date
FROM clicks_stream
EMIT CHANGES;
```
**2. Materialized Views** (Pre-Computed Tables):
```sql
-- Aggregate clicks per user (updates continuously)
CREATE TABLE user_click_counts AS
SELECT
user_id,
COUNT(*) AS click_count
FROM clicks_stream
GROUP BY user_id
EMIT CHANGES;
-- Query the table (instant results!)
SELECT * FROM user_click_counts WHERE user_id = 123;
```
**3. Pull Queries** (Point-in-Time Reads):
```sql
-- Query current state (like traditional SQL)
SELECT * FROM users_table WHERE user_id = 123;
-- No EMIT CHANGES = pull query (returns once)
```
## When to Use This Skill
Activate me when you need help with:
- ksqlDB syntax ("How to create ksqlDB stream?")
- Stream vs table concepts ("When to use stream vs table?")
- Joins ("Join stream with table")
- Aggregations ("Count events per user")
- Windowing ("Tumbling window aggregation")
- Real-time transformations ("Filter and enrich events")
- Materialized views ("Create pre-computed aggregates")
## Common Patterns
### Pattern 1: Filter Events
**Use Case**: Drop irrelevant events early
```sql
-- Create filtered stream
CREATE STREAM important_clicks AS
SELECT *
FROM clicks_stream
WHERE page IN ('checkout', 'payment', 'confirmation')
EMIT CHANGES;
```
### Pattern 2: Enrich Events (Stream-Table Join)
**Use Case**: Add user details to click events
```sql
-- Users table (current state)
CREATE TABLE users (
user_id BIGINT PRIMARY KEY,
name VARCHAR,
email VARCHAR
) WITH (
kafka_topic='users',
value_format='AVRO'
);
-- Enrich clicks with user data
CREATE STREAM enriched_clicks AS
SELECT
c.user_id,
c.page,
c.timestamp,
u.name,
u.email
FROM clicks_stream c
LEFT JOIN users u ON c.user_id = u.user_id
EMIT CHANGES;
```
### Pattern 3: Real-Time Aggregation
**Use Case**: Count events per user, per 5-minute window
```sql
CREATE TABLE user_clicks_per_5min AS
SELECT
user_id,
WINDOWSTART AS window_start,
WINDOWEND AS window_end,
COUNT(*) AS click_count
FROM clicks_stream
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY user_id
EMIT CHANGES;
-- Query current window
SELECT * FROM user_clicks_per_5min
WHERE user_id = 123
AND window_start >= NOW() - INTERVAL 5 MINUTES;
```
### Pattern 4: Detect Anomalies
**Use Case**: Alert when user clicks >100 times in 1 minute
```sql
CREATE STREAM high_click_alerts AS
SELECT
user_id,
COUNT(*) AS click_count
FROM clicks_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY user_id
HAVING COUNT(*) > 100
EMIT CHANGES;
```
### Pattern 5: Change Data Capture (CDC)
**Use Case**: Track changes to user table
```sql
-- Create table from CDC topic (Debezium)
CREATE TABLE users_cdc (
user_id BIGINT PRIMARY KEY,
name VARCHAR,
email VARCHAR,
op VARCHAR -- INSERT, UPDATE, DELETE
) WITH (
kafka_topic='mysql.users.cdc',
value_format='AVRO'
);
-- Stream of changes only
CREATE STREAM user_changes AS
SELECT * FROM users_cdc
WHERE op IN ('UPDATE', 'DELETE')
EMIT CHANGES;
```
## Join Types
### 1. Stream-Stream Join
**Use Case**: Correlate related events within time window
```sql
-- Join page views with clicks within 10 minutes
CREATE STREAM page_view_with_clicks AS
SELECT
v.user_id,
v.page AS viewed_page,
c.page AS clicked_page
FROM page_views v
INNER JOIN clicks c WITHIN 10 MINUTES
ON v.user_id = c.user_id
EMIT CHANGES;
```
**Window Types**:
- `WITHIN 10 MINUTES` - Events must be within 10 minutes of each other
- `GRACE PERIOD 5 MINUTES` - Late-arriving events accepted for 5 more minutes
### 2. Stream-Table Join
**Use Case**: Enrich events with current state
```sql
-- Add product details to order events
CREATE STREAM enriched_orders AS
SELECT
o.order_id,
o.product_id,
p.product_name,
p.price
FROM orders_stream o
LEFT JOIN products_table p ON o.product_id = p.product_id
EMIT CHANGES;
```
### 3. Table-Table Join
**Use Case**: Combine two tables (latest state)
```sql
-- Join users with their current cart
CREATE TABLE user_with_cart AS
SELECT
u.user_id,
u.name,
c.cart_total
FROM users u
LEFT JOIN shopping_carts c ON u.user_id = c.user_id
EMIT CHANGES;
```
## Windowing Types
### Tumbling Window (Non-Overlapping)
**Use Case**: Aggregate per fixed time period
```sql
-- Count events every 5 minutes
SELECT
user_id,
COUNT(*) AS event_count
FROM events
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY user_id;
-- Windows: [0:00-0:05), [0:05-0:10), [0:10-0:15)
```
### Hopping Window (Overlapping)
**Use Case**: Moving average over time
```sql
-- Count events in 10-minute windows, advancing every 5 minutes
SELECT
user_id,
COUNT(*) AS event_count
FROM events
WINDOW HOPPING (SIZE 10 MINUTES, ADVANCE BY 5 MINUTES)
GROUP BY user_id;
-- Windows: [0:00-0:10), [0:05-0:15), [0:10-0:20)
```
### Session Window (Event-Based)
**Use Case**: Group events by user session (gap-based)
```sql
-- Session ends after 30 minutes of inactivity
SELECT
user_id,
COUNT(*) AS session_events
FROM events
WINDOW SESSION (30 MINUTES)
GROUP BY user_id;
```
## Best Practices
### 1. Use Appropriate Data Types
**DO**:
```sql
CREATE STREAM orders (
order_id BIGINT,
user_id BIGINT,
total DECIMAL(10, 2), -- Precise currency
timestamp TIMESTAMP
);
```
**DON'T**:
```sql
-- WRONG: Using DOUBLE for currency (precision loss!)
total DOUBLE
```
### 2. Always Specify Keys
**DO**:
```sql
CREATE TABLE users (
user_id BIGINT PRIMARY KEY, -- Explicit key
name VARCHAR
) WITH (kafka_topic='users');
```
**DON'T**:
```sql
-- WRONG: No key specified (can't join!)
CREATE TABLE users (
user_id BIGINT,
name VARCHAR
);
```
### 3. Use Windowing for Aggregations
**DO**:
```sql
-- Windowed aggregation (bounded memory)
SELECT COUNT(*) FROM events
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY user_id;
```
**DON'T**:
```sql
-- WRONG: Non-windowed aggregation (unbounded memory!)
SELECT COUNT(*) FROM events GROUP BY user_id;
```
### 4. Set Retention Policies
```sql
-- Limit table size (keep last 7 days)
CREATE TABLE user_stats (
user_id BIGINT PRIMARY KEY,
click_count BIGINT
) WITH (
kafka_topic='user_stats',
retention_ms=604800000 -- 7 days
);
```
## Performance Optimization
### 1. Partition Alignment
**Ensure joined streams/tables have same partition key**:
```sql
-- GOOD: Both keyed by user_id (co-partitioned)
CREATE STREAM clicks (user_id BIGINT KEY, ...)
CREATE TABLE users (user_id BIGINT PRIMARY KEY, ...)
-- Join works efficiently (no repartitioning)
SELECT * FROM clicks c
JOIN users u ON c.user_id = u.user_id;
```
### 2. Use Materialized Views
**Pre-compute expensive queries**:
```sql
-- BAD: Compute on every request
SELECT COUNT(*) FROM orders WHERE user_id = 123;
-- GOOD: Materialized table (instant lookup)
CREATE TABLE user_order_counts AS
SELECT user_id, COUNT(*) AS order_count
FROM orders GROUP BY user_id;
-- Query is now instant
SELECT order_count FROM user_order_counts WHERE user_id = 123;
```
### 3. Filter Early
```sql
-- GOOD: Filter before join
CREATE STREAM important_events AS
SELECT * FROM events WHERE event_type = 'purchase';
SELECT * FROM important_events e
JOIN users u ON e.user_id = u.user_id;
-- BAD: Join first, filter later (processes all events!)
SELECT * FROM events e
JOIN users u ON e.user_id = u.user_id
WHERE e.event_type = 'purchase';
```
## Common Issues & Solutions
### Issue 1: Query Timing Out
**Error**: Query timed out
**Root Cause**: Non-windowed aggregation on large stream
**Solution**: Add time window:
```sql
-- WRONG
SELECT COUNT(*) FROM events GROUP BY user_id;
-- RIGHT
SELECT COUNT(*) FROM events
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY user_id;
```
### Issue 2: Partition Mismatch
**Error**: Cannot join streams (different partition keys)
**Solution**: Repartition stream:
```sql
-- Repartition stream by user_id
CREATE STREAM clicks_by_user AS
SELECT * FROM clicks PARTITION BY user_id;
-- Now join works
SELECT * FROM clicks_by_user c
JOIN users u ON c.user_id = u.user_id;
```
### Issue 3: Late-Arriving Events
**Solution**: Use grace period:
```sql
SELECT COUNT(*) FROM events
WINDOW TUMBLING (SIZE 5 MINUTES, GRACE PERIOD 1 MINUTE)
GROUP BY user_id;
-- Accepts events up to 1 minute late
```
## References
- ksqlDB Documentation: https://docs.ksqldb.io/
- ksqlDB Tutorials: https://kafka-tutorials.confluent.io/
- Windowing Guide: https://docs.ksqldb.io/en/latest/concepts/time-and-windows-in-ksqldb-queries/
- Join Types: https://docs.ksqldb.io/en/latest/developer-guide/joins/
---
**Invoke me when you need stream processing, real-time analytics, or SQL-like queries on Kafka!**

View File

@@ -0,0 +1,316 @@
---
name: confluent-schema-registry
description: Schema Registry expert for Avro, Protobuf, and JSON Schema management. Covers schema evolution strategies, compatibility modes, validation, and best practices for managing schemas in Confluent Cloud and self-hosted Schema Registry. Activates for schema registry, avro, protobuf, json schema, schema evolution, compatibility modes, schema validation.
---
# Confluent Schema Registry Skill
Expert knowledge of Confluent Schema Registry for managing Avro, Protobuf, and JSON Schema schemas in Kafka ecosystems.
## What I Know
### Schema Formats
**Avro** (Most Popular):
- Binary serialization format
- Schema evolution support
- Smaller message size vs JSON
- Self-describing with schema ID in header
- Best for: High-throughput applications, data warehousing
**Protobuf** (Google Protocol Buffers):
- Binary serialization
- Strong typing with .proto files
- Language-agnostic (Java, Python, Go, C++, etc.)
- Efficient encoding
- Best for: Polyglot environments, gRPC integration
**JSON Schema**:
- Human-readable text format
- Easy debugging
- Widely supported
- Larger message size
- Best for: Development, debugging, REST APIs
### Compatibility Modes
| Mode | Producer Can | Consumer Can | Use Case |
|------|-------------|-------------|----------|
| **BACKWARD** | Remove fields, add optional fields | Read old data with new schema | Most common, safe for consumers |
| **FORWARD** | Add fields, remove optional fields | Read new data with old schema | Safe for producers |
| **FULL** | Add/remove optional fields only | Bi-directional compatibility | Both producers and consumers upgrade independently |
| **NONE** | Any change | Must coordinate upgrades | Development only, NOT production |
| **BACKWARD_TRANSITIVE** | BACKWARD across all versions | Read any old data | Strictest backward compatibility |
| **FORWARD_TRANSITIVE** | FORWARD across all versions | Read any new data | Strictest forward compatibility |
| **FULL_TRANSITIVE** | FULL across all versions | Complete bi-directional | Strictest overall |
**Default**: `BACKWARD` (recommended for production)
### Schema Evolution Strategies
**Adding Fields**:
```avro
// V1
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}
// V2 - BACKWARD compatible (added optional field with default)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
```
**Removing Fields** (BACKWARD compatible):
```avro
// V1
{"name": "address", "type": "string"}
// V2 - Remove field (old consumers will ignore it)
// Field removed from schema
```
**Changing Field Types** (Breaking Change!):
```avro
// ❌ BREAKING - Cannot change string to int
{"name": "age", "type": "string"} → {"name": "age", "type": "int"}
// ✅ SAFE - Use union types
{"name": "age", "type": ["string", "int"], "default": "unknown"}
```
## When to Use This Skill
Activate me when you need help with:
- Schema evolution strategies ("How do I evolve my Avro schema?")
- Compatibility mode selection ("Which compatibility mode for production?")
- Schema validation ("Validate my Avro schema")
- Best practices ("Schema Registry best practices")
- Schema registration ("Register Avro schema with Schema Registry")
- Debugging schema issues ("Schema compatibility error")
- Format comparison ("Avro vs Protobuf vs JSON Schema")
## Best Practices
### 1. Always Use Compatible Evolution
**DO**:
- Add optional fields with defaults
- Remove optional fields
- Use union types for flexibility
- Test schema changes in staging first
**DON'T**:
- Change field types
- Remove required fields
- Rename fields (add new + deprecate old)
- Use `NONE` compatibility in production
### 2. Schema Naming Conventions
**Hierarchical Namespaces**:
```
com.company.domain.EntityName
com.acme.ecommerce.Order
com.acme.ecommerce.OrderLineItem
```
**Subject Naming** (Kafka topics):
- `<topic-name>-value` - For record values
- `<topic-name>-key` - For record keys
- Example: `orders-value`, `orders-key`
### 3. Schema Registry Configuration
**Producer** (with Avro):
```javascript
const { Kafka } = require('kafkajs');
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');
const registry = new SchemaRegistry({
host: 'https://schema-registry:8081',
auth: {
username: 'SR_API_KEY',
password: 'SR_API_SECRET'
}
});
// Register schema
const schema = `
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}
`;
const { id } = await registry.register({
type: SchemaType.AVRO,
schema
});
// Encode message with schema
const payload = await registry.encode(id, {
id: 1,
name: 'John Doe'
});
await producer.send({
topic: 'users',
messages: [{ value: payload }]
});
```
**Consumer** (with Avro):
```javascript
const consumer = kafka.consumer({ groupId: 'user-processor' });
await consumer.subscribe({ topic: 'users' });
await consumer.run({
eachMessage: async ({ message }) => {
// Decode message (schema ID is in header)
const decodedMessage = await registry.decode(message.value);
console.log(decodedMessage); // { id: 1, name: 'John Doe' }
}
});
```
### 4. Schema Validation Workflow
**Before Registering**:
1. Validate schema syntax (Avro JSON, .proto, JSON Schema)
2. Check compatibility with existing versions
3. Test with sample data
4. Register in dev/staging first
5. Deploy to production after validation
**CLI Validation**:
```bash
# Check compatibility (before registering)
curl -X POST http://localhost:8081/compatibility/subjects/users-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{...}"}'
# Register schema
curl -X POST http://localhost:8081/subjects/users-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{...}"}'
```
## Common Issues & Solutions
### Issue 1: Schema Compatibility Error
**Error**:
```
Schema being registered is incompatible with an earlier schema
```
**Root Cause**: Violates compatibility mode (e.g., removed required field with BACKWARD mode)
**Solution**:
1. Check current compatibility mode:
```bash
curl http://localhost:8081/config/users-value
```
2. Fix schema to be compatible OR change mode (carefully!)
3. Validate before registering:
```bash
curl -X POST http://localhost:8081/compatibility/subjects/users-value/versions/latest \
-d '{"schema": "{...}"}'
```
### Issue 2: Schema Not Found
**Error**:
```
Subject 'users-value' not found
```
**Root Cause**: Schema not registered yet OR wrong subject name
**Solution**:
1. List all subjects:
```bash
curl http://localhost:8081/subjects
```
2. Register schema if missing
3. Check subject naming convention (`<topic>-key` or `<topic>-value`)
### Issue 3: Message Deserialization Failed
**Error**:
```
Unknown magic byte!
```
**Root Cause**: Message not encoded with Schema Registry (missing magic byte + schema ID)
**Solution**:
1. Ensure producer uses Schema Registry encoder
2. Check message format: [magic_byte(1) + schema_id(4) + payload]
3. Use `@kafkajs/confluent-schema-registry` library
## Schema Evolution Decision Tree
```
Need to change schema?
├─ Adding new field?
│ ├─ Required field? → Add with default value (BACKWARD)
│ └─ Optional field? → Add with default null (BACKWARD)
├─ Removing field?
│ ├─ Required field? → ❌ BREAKING CHANGE (coordinate upgrade)
│ └─ Optional field? → ✅ BACKWARD compatible
├─ Changing field type?
│ ├─ Compatible types (e.g., int → long)? → Use union types
│ └─ Incompatible types? → ❌ BREAKING CHANGE (add new field, deprecate old)
└─ Renaming field?
└─ ❌ BREAKING CHANGE → Add new field + mark old as deprecated
```
## Avro vs Protobuf vs JSON Schema Comparison
| Feature | Avro | Protobuf | JSON Schema |
|---------|------|----------|-------------|
| **Encoding** | Binary | Binary | Text (JSON) |
| **Message Size** | Small (90% smaller) | Small (80% smaller) | Large (baseline) |
| **Human Readable** | No | No | Yes |
| **Schema Evolution** | Excellent | Good | Fair |
| **Language Support** | Java, Python, C++ | 20+ languages | Universal |
| **Performance** | Very Fast | Very Fast | Slower |
| **Debugging** | Harder | Harder | Easy |
| **Best For** | Data warehousing, ETL | Polyglot, gRPC | REST APIs, dev |
**Recommendation**:
- **Production**: Avro (best balance)
- **Polyglot teams**: Protobuf
- **Development/Debugging**: JSON Schema
## References
- Schema Registry REST API: https://docs.confluent.io/platform/current/schema-registry/develop/api.html
- Avro Specification: https://avro.apache.org/docs/current/spec.html
- Protobuf Guide: https://developers.google.com/protocol-buffers
- JSON Schema Spec: https://json-schema.org/
---
**Invoke me when you need schema management, evolution strategies, or compatibility guidance!**