180 lines
3.5 KiB
Markdown
180 lines
3.5 KiB
Markdown
# ksqlDB Query Generator
|
|
|
|
Generate ksqlDB queries for stream processing.
|
|
|
|
## Task
|
|
|
|
You are a ksqlDB expert. Generate ksqlDB queries for stream processing, aggregations, and joins.
|
|
|
|
### Steps:
|
|
|
|
1. **Ask for Requirements**:
|
|
- Query type: Stream, Table, Join, Aggregation
|
|
- Source topics/streams
|
|
- Output requirements
|
|
|
|
2. **Generate ksqlDB Statements**:
|
|
|
|
#### Create Stream from Topic:
|
|
```sql
|
|
CREATE STREAM users_stream (
|
|
id VARCHAR KEY,
|
|
email VARCHAR,
|
|
name VARCHAR,
|
|
created_at BIGINT
|
|
) WITH (
|
|
KAFKA_TOPIC='users',
|
|
VALUE_FORMAT='JSON',
|
|
TIMESTAMP='created_at'
|
|
);
|
|
```
|
|
|
|
#### Create Table (Materialized View):
|
|
```sql
|
|
CREATE TABLE user_counts AS
|
|
SELECT
|
|
region,
|
|
COUNT(*) AS user_count,
|
|
COLLECT_LIST(name) AS user_names
|
|
FROM users_stream
|
|
GROUP BY region
|
|
EMIT CHANGES;
|
|
```
|
|
|
|
#### Stream-Stream Join:
|
|
```sql
|
|
CREATE STREAM orders_enriched AS
|
|
SELECT
|
|
o.order_id,
|
|
o.product_id,
|
|
o.quantity,
|
|
o.price,
|
|
u.name AS customer_name,
|
|
u.email AS customer_email,
|
|
o.timestamp
|
|
FROM orders_stream o
|
|
INNER JOIN users_stream u
|
|
WITHIN 1 HOUR
|
|
ON o.user_id = u.id
|
|
EMIT CHANGES;
|
|
```
|
|
|
|
#### Windowed Aggregation:
|
|
```sql
|
|
-- Tumbling Window (5-minute windows)
|
|
CREATE TABLE sales_by_category_5min AS
|
|
SELECT
|
|
category,
|
|
WINDOWSTART AS window_start,
|
|
WINDOWEND AS window_end,
|
|
COUNT(*) AS order_count,
|
|
SUM(amount) AS total_sales,
|
|
AVG(amount) AS avg_sale,
|
|
MAX(amount) AS max_sale
|
|
FROM orders_stream
|
|
WINDOW TUMBLING (SIZE 5 MINUTES)
|
|
GROUP BY category
|
|
EMIT CHANGES;
|
|
|
|
-- Hopping Window (5-min window, 1-min advance)
|
|
CREATE TABLE sales_hopping AS
|
|
SELECT
|
|
category,
|
|
WINDOWSTART AS window_start,
|
|
COUNT(*) AS order_count
|
|
FROM orders_stream
|
|
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
|
|
GROUP BY category
|
|
EMIT CHANGES;
|
|
|
|
-- Session Window (inactivity gap = 30 minutes)
|
|
CREATE TABLE user_sessions AS
|
|
SELECT
|
|
user_id,
|
|
WINDOWSTART AS session_start,
|
|
WINDOWEND AS session_end,
|
|
COUNT(*) AS event_count
|
|
FROM user_events_stream
|
|
WINDOW SESSION (30 MINUTES)
|
|
GROUP BY user_id
|
|
EMIT CHANGES;
|
|
```
|
|
|
|
#### Filtering and Transformation:
|
|
```sql
|
|
CREATE STREAM high_value_orders AS
|
|
SELECT
|
|
order_id,
|
|
user_id,
|
|
amount,
|
|
UCASE(status) AS status,
|
|
CASE
|
|
WHEN amount > 1000 THEN 'PREMIUM'
|
|
WHEN amount > 500 THEN 'STANDARD'
|
|
ELSE 'BASIC'
|
|
END AS tier
|
|
FROM orders_stream
|
|
WHERE amount > 100
|
|
EMIT CHANGES;
|
|
```
|
|
|
|
#### Array and Map Operations:
|
|
```sql
|
|
CREATE STREAM processed_events AS
|
|
SELECT
|
|
id,
|
|
ARRAY_CONTAINS(tags, 'premium') AS is_premium,
|
|
ARRAY_LENGTH(items) AS item_count,
|
|
MAP_KEYS(metadata) AS meta_keys,
|
|
metadata['source'] AS source
|
|
FROM events_stream
|
|
EMIT CHANGES;
|
|
```
|
|
|
|
3. **Generate Queries**:
|
|
|
|
```sql
|
|
-- Push query (continuous)
|
|
SELECT * FROM users_stream
|
|
WHERE region = 'US'
|
|
EMIT CHANGES;
|
|
|
|
-- Pull query (one-time, requires table)
|
|
SELECT * FROM user_counts
|
|
WHERE region = 'US';
|
|
```
|
|
|
|
4. **Generate Monitoring Commands**:
|
|
|
|
```sql
|
|
-- Show streams
|
|
SHOW STREAMS;
|
|
|
|
-- Describe stream
|
|
DESCRIBE users_stream;
|
|
|
|
-- Show queries
|
|
SHOW QUERIES;
|
|
|
|
-- Explain query
|
|
EXPLAIN query_id;
|
|
|
|
-- Terminate query
|
|
TERMINATE query_id;
|
|
```
|
|
|
|
5. **Best Practices**:
|
|
- Use appropriate window types for aggregations
|
|
- Set RETENTION for stateful operations
|
|
- Use pull queries for point-in-time lookups
|
|
- Configure partitioning for joins
|
|
- Add error handling for UDFs
|
|
- Monitor query performance
|
|
|
|
### Example Usage:
|
|
|
|
```
|
|
User: "Calculate hourly sales by category"
|
|
Result: Complete ksqlDB window aggregation query
|
|
```
|