commit 694e6f31174501ebbf46aff06c7d9c1ef837ec98 Author: Zhongwei Li Date: Sat Nov 29 17:56:48 2025 +0800 Initial commit diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..ef02a2a --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,15 @@ +{ + "name": "specweave-kafka-streams", + "description": "Kafka Streams library integration for SpecWeave - Stream processing with Java/Kotlin, topology patterns, state stores, windowing, joins, and testing frameworks", + "version": "0.24.0", + "author": { + "name": "SpecWeave Team", + "url": "https://spec-weave.com" + }, + "skills": [ + "./skills" + ], + "commands": [ + "./commands" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..8a880b6 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# specweave-kafka-streams + +Kafka Streams library integration for SpecWeave - Stream processing with Java/Kotlin, topology patterns, state stores, windowing, joins, and testing frameworks diff --git a/commands/topology.md b/commands/topology.md new file mode 100644 index 0000000..aca651d --- /dev/null +++ b/commands/topology.md @@ -0,0 +1,437 @@ +--- +name: specweave-kafka-streams:topology +description: Generate Kafka Streams topology code (Java/Kotlin) with KStream/KTable patterns. Creates stream processing applications with windowing, joins, state stores, and exactly-once semantics. +--- + +# Generate Kafka Streams Topology + +Create production-ready Kafka Streams applications with best practices baked in. + +## What This Command Does + +1. **Select Pattern**: Choose topology pattern (word count, enrichment, aggregation, etc.) +2. **Configure Topics**: Input/output topics and schemas +3. **Define Operations**: Filter, map, join, aggregate, window +4. **Generate Code**: Java or Kotlin implementation +5. **Add Tests**: Topology Test Driver unit tests +6. **Build Configuration**: Gradle/Maven, dependencies, configs + +## Available Patterns + +### Pattern 1: Stream Processing (Filter + Transform) +**Use Case**: Data cleansing and transformation + +**Topology**: +```java +KStream events = builder.stream("raw-events"); + +KStream processed = events + .filter((key, value) -> value.isValid()) + .mapValues(value -> value.toUpperCase()) + .selectKey((key, value) -> value.getUserId()); + +processed.to("processed-events"); +``` + +### Pattern 2: Stream-Table Join (Enrichment) +**Use Case**: Enrich events with reference data + +**Topology**: +```java +// Users table (changelog stream) +KTable users = builder.table("users"); + +// Click events +KStream clicks = builder.stream("clicks"); + +// Enrich clicks with user data +KStream enriched = clicks.leftJoin( + users, + (click, user) -> new EnrichedClick( + click.getPage(), + user != null ? user.getName() : "unknown", + click.getTimestamp() + ) +); + +enriched.to("enriched-clicks"); +``` + +### Pattern 3: Windowed Aggregation +**Use Case**: Time-based metrics (counts, sums, averages) + +**Topology**: +```java +KTable, Long> counts = events + .groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))) + .count(Materialized.as("event-counts")); + +counts.toStream() + .map((windowedKey, count) -> { + String key = windowedKey.key(); + Instant start = windowedKey.window().startTime(); + return KeyValue.pair(key, new WindowedCount(key, start, count)); + }) + .to("event-counts-output"); +``` + +### Pattern 4: Stateful Deduplication +**Use Case**: Remove duplicate events within time window + +**Topology**: +```java +KStream deduplicated = events + .transformValues( + () -> new DeduplicationTransformer(Duration.ofMinutes(10)), + Materialized.as("dedup-store") + ); + +deduplicated.to("unique-events"); +``` + +## Example Usage + +```bash +# Generate topology +/specweave-kafka-streams:topology + +# I'll ask: +# 1. Language? (Java or Kotlin) +# 2. Pattern? (Filter/Transform, Join, Aggregation, Deduplication) +# 3. Input topic(s)? +# 4. Output topic(s)? +# 5. Windowing? (if aggregation) +# 6. State store? (if stateful) +# 7. Build tool? (Gradle or Maven) + +# Then I'll generate: +# - src/main/java/MyApp.java (application code) +# - src/test/java/MyAppTest.java (unit tests) +# - build.gradle or pom.xml +# - application.properties +# - README.md with setup instructions +``` + +## Generated Files + +**1. StreamsApplication.java**: Main topology +```java +package com.example.streams; + +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.*; + +public class StreamsApplication { + public static void main(String[] args) { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, + StreamsConfig.EXACTLY_ONCE_V2); + + StreamsBuilder builder = new StreamsBuilder(); + + // Topology code here + KStream input = builder.stream("input-topic"); + KStream processed = input + .filter((key, value) -> value != null) + .mapValues(value -> value.toUpperCase()); + processed.to("output-topic"); + + KafkaStreams streams = new KafkaStreams(builder.build(), props); + streams.start(); + + // Graceful shutdown + Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); + } +} +``` + +**2. StreamsApplicationTest.java**: Unit tests with Topology Test Driver +```java +package com.example.streams; + +import org.apache.kafka.streams.*; +import org.junit.jupiter.api.*; + +public class StreamsApplicationTest { + private TopologyTestDriver testDriver; + private TestInputTopic inputTopic; + private TestOutputTopic outputTopic; + + @BeforeEach + public void setup() { + StreamsBuilder builder = new StreamsBuilder(); + // Build topology + // ... + + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); + + testDriver = new TopologyTestDriver(builder.build(), props); + inputTopic = testDriver.createInputTopic("input-topic", + Serdes.String().serializer(), + Serdes.String().serializer()); + outputTopic = testDriver.createOutputTopic("output-topic", + Serdes.String().deserializer(), + Serdes.String().deserializer()); + } + + @Test + public void testTransformation() { + // Send test data + inputTopic.pipeInput("key1", "hello"); + + // Assert output + KeyValue output = outputTopic.readKeyValue(); + assertEquals("HELLO", output.value); + } + + @AfterEach + public void tearDown() { + testDriver.close(); + } +} +``` + +**3. build.gradle**: Gradle build configuration +```groovy +plugins { + id 'java' + id 'application' +} + +group = 'com.example' +version = '1.0.0' + +repositories { + mavenCentral() +} + +dependencies { + implementation 'org.apache.kafka:kafka-streams:3.6.0' + implementation 'org.slf4j:slf4j-simple:2.0.9' + + testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.6.0' + testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0' +} + +application { + mainClass = 'com.example.streams.StreamsApplication' +} + +test { + useJUnitPlatform() +} +``` + +**4. application.properties**: Runtime configuration +```properties +bootstrap.servers=localhost:9092 +application.id=my-streams-app +processing.guarantee=exactly_once_v2 +commit.interval.ms=100 +cache.max.bytes.buffering=10485760 +num.stream.threads=2 +replication.factor=3 +``` + +**5. README.md**: Setup instructions +```markdown +# Kafka Streams Application + +## Build + +```bash +# Gradle +./gradlew build + +# Maven +mvn clean package +``` + +## Run + +```bash +# Gradle +./gradlew run + +# Maven +mvn exec:java +``` + +## Test + +```bash +# Unit tests +./gradlew test + +# Integration tests (requires Kafka cluster) +./gradlew integrationTest +``` + +## Docker + +```bash +# Build image +docker build -t my-streams-app . + +# Run +docker run -e BOOTSTRAP_SERVERS=kafka:9092 my-streams-app +``` +``` + +## Configuration Options + +### Exactly-Once Semantics (EOS v2) +```java +props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, + StreamsConfig.EXACTLY_ONCE_V2); +``` + +### Multiple Stream Threads +```java +props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); +``` + +### State Store Configuration +```java +StoreBuilder> storeBuilder = + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore("my-store"), + Serdes.String(), + Serdes.Long() + ) + .withCachingEnabled() + .withLoggingEnabled(Map.of("retention.ms", "86400000")); +``` + +### Custom Serdes +```java +// JSON Serde (using Jackson) +public class JsonSerde implements Serde { + private final ObjectMapper mapper = new ObjectMapper(); + private final Class type; + + public JsonSerde(Class type) { + this.type = type; + } + + @Override + public Serializer serializer() { + return (topic, data) -> { + try { + return mapper.writeValueAsBytes(data); + } catch (Exception e) { + throw new SerializationException(e); + } + }; + } + + @Override + public Deserializer deserializer() { + return (topic, data) -> { + try { + return mapper.readValue(data, type); + } catch (Exception e) { + throw new SerializationException(e); + } + }; + } +} +``` + +## Testing Strategies + +### 1. Unit Tests (Topology Test Driver) +```java +// No Kafka cluster required +TopologyTestDriver testDriver = new TopologyTestDriver(topology, props); +``` + +### 2. Integration Tests (Embedded Kafka) +```java +@ExtendWith(EmbeddedKafkaExtension.class) +public class IntegrationTest { + @Test + public void testWithRealKafka(EmbeddedKafka kafka) { + // Real Kafka cluster + } +} +``` + +### 3. Performance Tests (Load Testing) +```bash +# Generate test load +kafka-producer-perf-test.sh \ + --topic input-topic \ + --num-records 1000000 \ + --throughput 10000 \ + --record-size 1024 \ + --producer-props bootstrap.servers=localhost:9092 +``` + +## Monitoring + +### JMX Metrics +```java +// Enable JMX +props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); + +// Export to Prometheus +props.put("metric.reporters", + "io.confluent.metrics.reporter.ConfluentMetricsReporter"); +``` + +### Key Metrics to Monitor +- **Consumer Lag**: `kafka.consumer.fetch.manager.records.lag.max` +- **Processing Rate**: `kafka.streams.stream.task.process.rate` +- **State Store Size**: `kafka.streams.state.store.bytes.total` +- **Rebalance Frequency**: `kafka.streams.consumer.coordinator.rebalance.total` + +## Troubleshooting + +### Issue 1: Rebalancing Too Frequently +**Solution**: Increase session timeout +```java +props.put(StreamsConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); +``` + +### Issue 2: State Store Too Large +**Solution**: Enable compaction, reduce retention +```java +storeBuilder.withLoggingEnabled(Map.of( + "cleanup.policy", "compact", + "retention.ms", "86400000" +)); +``` + +### Issue 3: Slow Processing +**Solution**: Increase parallelism +```java +// More threads +props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8); + +// More partitions (requires topic reconfiguration) +kafka-topics.sh --alter --topic input-topic --partitions 8 +``` + +## Related Commands + +- `/specweave-kafka:dev-env` - Set up local Kafka cluster for testing +- `/specweave-kafka:monitor-setup` - Configure Prometheus + Grafana monitoring + +## Documentation + +- **Kafka Streams Docs**: https://kafka.apache.org/documentation/streams/ +- **Topology Patterns**: `.specweave/docs/public/guides/kafka-streams-patterns.md` +- **State Stores**: `.specweave/docs/public/guides/kafka-streams-state.md` +- **Testing Guide**: `.specweave/docs/public/guides/kafka-streams-testing.md` + +--- + +**Plugin**: specweave-kafka-streams +**Version**: 1.0.0 +**Status**: ✅ Production Ready diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..73b8b82 --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,49 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:anton-abyzov/specweave:plugins/specweave-kafka-streams", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "b3ba663ad1284d387f801e7e93fdb9805de2e390", + "treeHash": "e1f429a5d1e8be3446af8477174213bfdd8056566bf41a72b69bc7ca208e38d2", + "generatedAt": "2025-11-28T10:13:51.925634Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "specweave-kafka-streams", + "description": "Kafka Streams library integration for SpecWeave - Stream processing with Java/Kotlin, topology patterns, state stores, windowing, joins, and testing frameworks", + "version": "0.24.0" + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "1904889d8eb5971d149f64cbffcde444867365525d75698bc275aebd79910b93" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "13c93052727765230b15347ee3207e16fd395a6e9af6adcd4dc0aa52cfd49d7d" + }, + { + "path": "commands/topology.md", + "sha256": "78e2b7ea8aea466b55647bae98f96be4b22a1b3fdf2d95a8595c5d4bc1fc33c8" + }, + { + "path": "skills/kafka-streams-topology/SKILL.md", + "sha256": "b9db3de04661dcfff703fc611a668e11420919fb9624581fb9a6c9ff0bd3193b" + } + ], + "dirSha256": "e1f429a5d1e8be3446af8477174213bfdd8056566bf41a72b69bc7ca208e38d2" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file diff --git a/skills/kafka-streams-topology/SKILL.md b/skills/kafka-streams-topology/SKILL.md new file mode 100644 index 0000000..d6bc546 --- /dev/null +++ b/skills/kafka-streams-topology/SKILL.md @@ -0,0 +1,539 @@ +--- +name: kafka-streams-topology +description: Kafka Streams topology design expert. Covers KStream vs KTable vs GlobalKTable, topology patterns, stream operations (filter, map, flatMap, branch), joins, windowing strategies, and exactly-once semantics. Activates for kafka streams topology, kstream, ktable, globalkTable, stream operations, stream joins, windowing, exactly-once, topology design. +--- + +# Kafka Streams Topology Skill + +Expert knowledge of Kafka Streams library for building stream processing topologies in Java/Kotlin. + +## What I Know + +### Core Abstractions + +**KStream** (Event Stream - Unbounded, Append-Only): +- Represents immutable event sequences +- Each record is an independent event +- Use for: Clickstreams, transactions, sensor readings + +**KTable** (Changelog Stream - Latest State by Key): +- Represents mutable state (compacted topic) +- Updates override previous values (by key) +- Use for: User profiles, product catalog, account balances + +**GlobalKTable** (Replicated Table - Available on All Instances): +- Full table replicated to every stream instance +- No partitioning (broadcast) +- Use for: Reference data (countries, products), lookups + +**Key Differences**: +```java +// KStream: Every event is independent +KStream clicks = builder.stream("clicks"); +clicks.foreach((key, value) -> { + System.out.println(value); // Prints every click event +}); + +// KTable: Latest value wins (by key) +KTable users = builder.table("users"); +users.toStream().foreach((key, value) -> { + System.out.println(value); // Prints only current user state +}); + +// GlobalKTable: Replicated to all instances (no partitioning) +GlobalKTable products = builder.globalTable("products"); +// Available for lookups on any instance (no repartitioning needed) +``` + +## When to Use This Skill + +Activate me when you need help with: +- Topology design ("How to design Kafka Streams topology?") +- KStream vs KTable ("When to use KStream vs KTable?") +- Stream operations ("Filter and transform events") +- Joins ("Join KStream with KTable") +- Windowing ("Tumbling vs hopping vs session windows") +- Exactly-once semantics ("Enable EOS") +- Topology optimization ("Optimize stream processing") + +## Common Patterns + +### Pattern 1: Filter and Transform + +**Use Case**: Clean and enrich events + +```java +StreamsBuilder builder = new StreamsBuilder(); + +// Input stream +KStream clicks = builder.stream("clicks"); + +// Filter out bot clicks +KStream humanClicks = clicks + .filter((key, value) -> !value.isBot()); + +// Transform: Extract page from URL +KStream pages = humanClicks + .mapValues(click -> extractPage(click.getUrl())); + +// Write to output topic +pages.to("pages"); +``` + +### Pattern 2: Branch by Condition + +**Use Case**: Route events to different paths + +```java +Map> branches = orders + .split(Named.as("order-")) + .branch((key, order) -> order.getTotal() > 1000, Branched.as("high-value")) + .branch((key, order) -> order.getTotal() > 100, Branched.as("medium-value")) + .defaultBranch(Branched.as("low-value")); + +// High-value orders → priority processing +branches.get("order-high-value").to("priority-orders"); + +// Low-value orders → standard processing +branches.get("order-low-value").to("standard-orders"); +``` + +### Pattern 3: Enrich Stream with Table (Stream-Table Join) + +**Use Case**: Add user details to click events + +```java +// Users table (current state) +KTable users = builder.table("users"); + +// Clicks stream +KStream clicks = builder.stream("clicks"); + +// Enrich clicks with user data (left join) +KStream enriched = clicks.leftJoin( + users, + (click, user) -> new EnrichedClick( + click.getPage(), + user != null ? user.getName() : "unknown", + user != null ? user.getEmail() : "unknown" + ), + Joined.with(Serdes.Long(), clickSerde, userSerde) +); + +enriched.to("enriched-clicks"); +``` + +### Pattern 4: Aggregate with Windowing + +**Use Case**: Count clicks per user, per 5-minute window + +```java +KTable, Long> clickCounts = clicks + .groupByKey() + .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) + .count(Materialized.as("click-counts-store")); + +// Convert to stream for output +clickCounts.toStream() + .map((windowedKey, count) -> { + Long userId = windowedKey.key(); + Instant start = windowedKey.window().startTime(); + Instant end = windowedKey.window().endTime(); + return KeyValue.pair(userId, new WindowedCount(userId, start, end, count)); + }) + .to("click-counts"); +``` + +### Pattern 5: Stateful Processing with State Store + +**Use Case**: Detect duplicate events within 10 minutes + +```java +// Define state store +StoreBuilder> storeBuilder = + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore("dedup-store"), + Serdes.Long(), + Serdes.Long() + ); + +builder.addStateStore(storeBuilder); + +// Deduplicate events +KStream deduplicated = events.transformValues( + () -> new ValueTransformerWithKey() { + private KeyValueStore store; + + @Override + public void init(ProcessorContext context) { + this.store = context.getStateStore("dedup-store"); + } + + @Override + public Event transform(Long key, Event value) { + Long lastSeen = store.get(key); + long now = System.currentTimeMillis(); + + // Duplicate detected (within 10 minutes) + if (lastSeen != null && (now - lastSeen) < 600_000) { + return null; // Drop duplicate + } + + // Not duplicate, store timestamp + store.put(key, now); + return value; + } + }, + "dedup-store" +).filter((key, value) -> value != null); // Remove nulls + +deduplicated.to("unique-events"); +``` + +## Join Types + +### 1. Stream-Stream Join (Inner) + +**Use Case**: Correlate related events within time window + +```java +// Page views and clicks within 10 minutes +KStream views = builder.stream("page-views"); +KStream clicks = builder.stream("clicks"); + +KStream joined = clicks.join( + views, + (click, view) -> new ClickWithView(click, view), + JoinWindows.of(Duration.ofMinutes(10)), + StreamJoined.with(Serdes.Long(), clickSerde, viewSerde) +); +``` + +### 2. Stream-Table Join (Left) + +**Use Case**: Enrich events with current state + +```java +// Add product details to order items +KTable products = builder.table("products"); +KStream items = builder.stream("order-items"); + +KStream enriched = items.leftJoin( + products, + (item, product) -> new EnrichedOrderItem( + item, + product != null ? product.getName() : "Unknown", + product != null ? product.getPrice() : 0.0 + ) +); +``` + +### 3. Table-Table Join (Inner) + +**Use Case**: Combine two tables (latest state) + +```java +// Join users with their current shopping cart +KTable users = builder.table("users"); +KTable carts = builder.table("shopping-carts"); + +KTable joined = users.join( + carts, + (user, cart) -> new UserWithCart(user.getName(), cart.getTotal()) +); +``` + +### 4. Stream-GlobalKTable Join + +**Use Case**: Enrich with reference data (no repartitioning) + +```java +// Add country details to user registrations +GlobalKTable countries = builder.globalTable("countries"); +KStream registrations = builder.stream("registrations"); + +KStream enriched = registrations.leftJoin( + countries, + (userId, registration) -> registration.getCountryCode(), // Key extractor + (registration, country) -> new EnrichedRegistration( + registration, + country != null ? country.getName() : "Unknown" + ) +); +``` + +## Windowing Strategies + +### Tumbling Windows (Non-Overlapping) + +**Use Case**: Aggregate per fixed time period + +```java +// Count events every 5 minutes +KTable, Long> counts = events + .groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))) + .count(); + +// Windows: [0:00-0:05), [0:05-0:10), [0:10-0:15) +``` + +### Hopping Windows (Overlapping) + +**Use Case**: Moving average or overlapping aggregates + +```java +// Count events in 10-minute windows, advancing every 5 minutes +KTable, Long> counts = events + .groupByKey() + .windowedBy(TimeWindows.ofSizeAndGrace( + Duration.ofMinutes(10), + Duration.ofMinutes(5) + ).advanceBy(Duration.ofMinutes(5))) + .count(); + +// Windows: [0:00-0:10), [0:05-0:15), [0:10-0:20) +``` + +### Session Windows (Event-Based) + +**Use Case**: User sessions with inactivity gap + +```java +// Session ends after 30 minutes of inactivity +KTable, Long> sessionCounts = events + .groupByKey() + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30))) + .count(); +``` + +### Sliding Windows (Continuous) + +**Use Case**: Anomaly detection over sliding time window + +```java +// Detect >100 events in any 1-minute period +KTable, Long> slidingCounts = events + .groupByKey() + .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1))) + .count(); +``` + +## Best Practices + +### 1. Partition Keys Correctly + +✅ **DO**: +```java +// Repartition by user_id before aggregation +KStream byUser = events + .selectKey((key, value) -> value.getUserId()); + +// Now aggregation is efficient +KTable userCounts = byUser + .groupByKey() + .count(); +``` + +❌ **DON'T**: +```java +// WRONG: groupBy with different key (triggers repartitioning!) +KTable userCounts = events + .groupBy((key, value) -> KeyValue.pair(value.getUserId(), value)) + .count(); +``` + +### 2. Use Appropriate Serdes + +✅ **DO**: +```java +// Define custom serde for complex types +Serde userSerde = new JsonSerde<>(User.class); + +KStream users = builder.stream( + "users", + Consumed.with(Serdes.Long(), userSerde) +); +``` + +❌ **DON'T**: +```java +// WRONG: No serde specified (uses default String serde!) +KStream users = builder.stream("users"); +``` + +### 3. Enable Exactly-Once Semantics + +✅ **DO**: +```java +Properties props = new Properties(); +props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, + StreamsConfig.EXACTLY_ONCE_V2); // EOS v2 (recommended) +props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); // Commit frequently +``` + +### 4. Use Materialized Stores for Queries + +✅ **DO**: +```java +// Named store for interactive queries +KTable counts = events + .groupByKey() + .count(Materialized.>as("user-counts") + .withKeySerde(Serdes.Long()) + .withValueSerde(Serdes.Long())); + +// Query store from REST API +ReadOnlyKeyValueStore store = + streams.store(StoreQueryParameters.fromNameAndType( + "user-counts", + QueryableStoreTypes.keyValueStore() + )); + +Long count = store.get(userId); +``` + +## Topology Optimization + +### 1. Combine Operations + +**GOOD** (Single pass): +```java +KStream result = events + .filter((key, value) -> value.isValid()) + .mapValues(value -> value.toUpperCase()) + .filterNot((key, value) -> value.contains("test")); +``` + +**BAD** (Multiple intermediate topics): +```java +KStream valid = events.filter((key, value) -> value.isValid()); +valid.to("valid-events"); // Unnecessary write + +KStream fromValid = builder.stream("valid-events"); +KStream upper = fromValid.mapValues(v -> v.toUpperCase()); +``` + +### 2. Reuse KTables + +**GOOD** (Shared table): +```java +KTable users = builder.table("users"); + +KStream enrichedClicks = clicks.leftJoin(users, ...); +KStream enrichedOrders = orders.leftJoin(users, ...); +``` + +**BAD** (Duplicate tables): +```java +KTable users1 = builder.table("users"); +KTable users2 = builder.table("users"); // Duplicate! +``` + +## Testing Topologies + +### Topology Test Driver + +```java +@Test +public void testClickFilter() { + // Setup topology + StreamsBuilder builder = new StreamsBuilder(); + KStream clicks = builder.stream("clicks"); + clicks.filter((key, value) -> !value.isBot()) + .to("human-clicks"); + + Topology topology = builder.build(); + + // Create test driver + TopologyTestDriver testDriver = new TopologyTestDriver(topology); + + // Input topic + TestInputTopic inputTopic = testDriver.createInputTopic( + "clicks", + Serdes.Long().serializer(), + clickSerde.serializer() + ); + + // Output topic + TestOutputTopic outputTopic = testDriver.createOutputTopic( + "human-clicks", + Serdes.Long().deserializer(), + clickSerde.deserializer() + ); + + // Send test data + inputTopic.pipeInput(1L, new Click(1L, "page1", false)); // Human + inputTopic.pipeInput(2L, new Click(2L, "page2", true)); // Bot + + // Assert output + List output = outputTopic.readValuesToList(); + assertEquals(1, output.size()); // Only human click + assertFalse(output.get(0).isBot()); + + testDriver.close(); +} +``` + +## Common Issues & Solutions + +### Issue 1: StreamsException - Not Co-Partitioned + +**Error**: Topics not co-partitioned for join + +**Root Cause**: Joined streams/tables have different partition counts + +**Solution**: Repartition to match: +```java +// Ensure same partition count +KStream repartitioned = events + .through("events-repartitioned", + Produced.with(Serdes.Long(), eventSerde) + .withStreamPartitioner((topic, key, value, numPartitions) -> + (int) (key % 12) // Match target partition count + ) + ); +``` + +### Issue 2: Out of Memory (Large State Store) + +**Error**: Java heap space + +**Root Cause**: State store too large, windowing not used + +**Solution**: Add time-based cleanup: +```java +// Use windowing to limit state size +KTable, Long> counts = events + .groupByKey() + .windowedBy(TimeWindows.ofSizeAndGrace( + Duration.ofHours(24), // Window size + Duration.ofHours(1) // Grace period + )) + .count(); +``` + +### Issue 3: High Lag, Slow Processing + +**Root Cause**: Blocking operations, inefficient transformations + +**Solution**: Use async processing: +```java +// BAD: Blocking HTTP call +events.mapValues(value -> { + return httpClient.get(value.getUrl()); // BLOCKS! +}); + +// GOOD: Async processing with state store +events.transformValues(() -> new AsyncEnricher()); +``` + +## References + +- Kafka Streams Documentation: https://kafka.apache.org/documentation/streams/ +- Kafka Streams Tutorial: https://kafka.apache.org/documentation/streams/tutorial +- Testing Guide: https://kafka.apache.org/documentation/streams/developer-guide/testing.html + +--- + +**Invoke me when you need topology design, joins, windowing, or exactly-once semantics expertise!**