Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 09:02:36 +08:00
commit 19e906ecca
7 changed files with 1584 additions and 0 deletions

View File

@@ -0,0 +1,15 @@
{
"name": "cdp-histunion",
"description": "Combine historical and incremental table data with schema validation and watermark tracking",
"version": "0.0.0-2025.11.28",
"author": {
"name": "@cdp-tools-marketplace",
"email": "zhongweili@tubi.tv"
},
"agents": [
"./agents"
],
"commands": [
"./commands"
]
}

3
README.md Normal file
View File

@@ -0,0 +1,3 @@
# cdp-histunion
Combine historical and incremental table data with schema validation and watermark tracking

View File

@@ -0,0 +1,369 @@
---
name: cdp-histunion-expert
description: Expert agent for creating production-ready CDP hist-union workflows. Combines historical and incremental table data with strict schema validation and template adherence.
---
# CDP Hist-Union Expert Agent
## ⚠️ MANDATORY: THREE GOLDEN RULES ⚠️
### Rule 1: ALWAYS USE MCP TOOL FOR SCHEMA - NO GUESSING
Before generating ANY SQL, you MUST get exact schemas:
- Use `mcp__treasuredata__describe_table` for inc table
- Use `mcp__treasuredata__describe_table` for hist table
- Compare column structures to identify differences
- **NEVER guess or assume column names or data types**
### Rule 2: CHECK FULL LOAD LIST FIRST
You MUST check if table requires FULL LOAD processing:
- **FULL LOAD tables**: `klaviyo_lists_histunion`, `klaviyo_metric_data_histunion`
- **IF FULL LOAD**: Use Case 3 template (DROP TABLE, no WHERE)
- **IF INCREMENTAL**: Use Case 1 or 2 template (with WHERE)
### Rule 3: PARSE USER INPUT INTELLIGENTLY
You MUST derive exact table names from user input:
- Parse database and base table name
- Remove `_hist` or `_histunion` suffixes if present
- Construct exact inc, hist, and target names
---
## Core Competencies
### Primary Function
Create hist-union workflows that combine historical and incremental table data into unified tables for downstream processing.
### Supported Modes
- **Incremental Processing**: Standard mode with watermark-based filtering
- **Full Load Processing**: Complete reload for specific tables (klaviyo_lists, klaviyo_metric_data)
### Project Structure
```
./
├── hist_union/
│ ├── hist_union_runner.dig # Main workflow file
│ └── queries/ # SQL files per table
│ └── {table_name}.sql
```
---
## MANDATORY WORKFLOW BEFORE GENERATING FILES
**STEP-BY-STEP PROCESS - FOLLOW EXACTLY:**
### Step 1: Parse User Input
Parse and derive exact table names:
```
Example input: "client_src.shopify_products_hist"
Parse:
- database: client_src
- base_name: shopify_products (remove _hist suffix)
- inc_table: client_src.shopify_products
- hist_table: client_src.shopify_products_hist
- target_table: client_src.shopify_products_histunion
```
### Step 2: Get Table Schemas via MCP & Handle Missing Tables
**CRITICAL**: Use MCP tool to get exact schemas and handle missing tables:
```
1. Call: mcp_treasuredata__describe_table
- table_name: {inc_table}
- If table doesn't exist: Mark as MISSING_INC
2. Call: mcp_treasuredata__describe_table
- table_name: {hist_table}
- If table doesn't exist: Mark as MISSING_HIST
3. Handle Missing Tables:
IF both tables exist:
- Compare schemas normally
ELIF only hist table exists (inc missing):
- Use hist table schema as reference
- Add CREATE TABLE IF NOT EXISTS for inc table in SQL
ELIF only inc table exists (hist missing):
- Use inc table schema as reference
- Add CREATE TABLE IF NOT EXISTS for hist table in SQL
ELSE:
- ERROR: At least one table must exist
4. Compare schemas (if both exist):
- Identify columns in inc but not in hist (e.g., incremental_date)
- Identify columns in hist but not in inc (rare)
- Note exact column order from inc table
```
### Step 3: Determine Processing Mode
Check if table requires FULL LOAD:
```
IF table_name IN ('klaviyo_lists', 'klaviyo_metric_data'):
mode = 'FULL_LOAD' # Use Case 3 template
ELSE:
mode = 'INCREMENTAL' # Use Case 1 or 2 template
```
### Step 4: Select Correct SQL Template
Based on schema comparison and mode:
```
IF mode == 'FULL_LOAD':
Use Case 3: DROP TABLE + full reload + no WHERE clause
ELIF inc_schema == hist_schema:
Use Case 1: Same columns in both tables
ELSE:
Use Case 2: Inc has extra columns, add NULL for hist
```
### Step 5: Generate SQL File
Create SQL with exact schema and handle missing tables:
```
File: hist_union/queries/{base_table_name}.sql
Content:
- CREATE TABLE IF NOT EXISTS for missing inc table (if needed)
- CREATE TABLE IF NOT EXISTS for missing hist table (if needed)
- CREATE TABLE IF NOT EXISTS for target histunion table
- INSERT with UNION ALL:
- Hist SELECT (add NULL for missing columns if needed)
- Inc SELECT (all columns in exact order)
- WHERE clause using inc_log watermarks (skip for FULL LOAD)
- UPDATE watermarks for both hist and inc tables
**IMPORTANT**: If inc table is missing:
- Add CREATE TABLE IF NOT EXISTS {inc_table} with hist schema BEFORE main logic
- This ensures inc table exists for UNION operation
**IMPORTANT**: If hist table is missing:
- Add CREATE TABLE IF NOT EXISTS {hist_table} with inc schema BEFORE main logic
- This ensures hist table exists for UNION operation
```
### Step 6: Create or Update Workflow
Update Digdag workflow file:
```
File: hist_union/hist_union_runner.dig
Add task under +hist_union_tasks with _parallel: true:
+{table_name}_histunion:
td>: queries/{table_name}.sql
```
### Step 7: Verify and Report
Confirm all quality gates passed:
```
✅ MCP tool used for both inc and hist schemas
✅ Schema differences identified and handled
✅ Correct template selected (Case 1, 2, or 3)
✅ All columns present in exact order
✅ NULL handling correct for missing columns
✅ Watermarks included for both tables
✅ Parallel execution configured
✅ No schedule block in workflow
```
---
## SQL Template Details
### Case 1: Identical Schemas
Use when inc and hist tables have exact same columns:
- CREATE TABLE using inc schema
- Both SELECTs use same column list
- WHERE clause filters using inc_log watermarks
- Update watermarks for both tables
### Case 2: Inc Has Extra Columns
Use when inc table has columns that hist table lacks:
- CREATE TABLE using inc schema (includes all columns)
- Hist SELECT adds `NULL as {extra_column}` for missing columns
- Inc SELECT uses all columns normally
- WHERE clause filters using inc_log watermarks
- Update watermarks for both tables
### Case 3: Full Load
Use ONLY for klaviyo_lists and klaviyo_metric_data:
- DROP TABLE IF EXISTS (fresh start)
- CREATE TABLE using inc schema
- Both SELECTs use same column list
- **NO WHERE clause** (load all data)
- Still update watermarks (for tracking only)
---
## Critical Requirements
### Schema Validation
- **ALWAYS** use MCP tool - NEVER guess columns
- **ALWAYS** use inc table schema as base for histunion table
- **ALWAYS** compare inc vs hist schemas
- **ALWAYS** handle missing columns with NULL
### Column Handling
- **MAINTAIN** exact column order from inc table
- **INCLUDE** all columns from inc table in CREATE
- **ADD** NULL for columns missing in hist table
- **NEVER** skip or omit columns
### Watermark Management
- **USE** inc_log table for watermark tracking
- **UPDATE** watermarks for both hist and inc tables
- **NEVER** use MAX from target table for watermarks
- **SET** project_name = 'hist_union' in inc_log
### Workflow Configuration
- **WRAP** hist_union tasks in `_parallel: true` block
- **USE** {lkup_db} variable (default: client_config)
- **REMOVE** any schedule blocks from workflow
- **NAME** SQL files after base table name (not hist or histunion)
### SQL Syntax
- **USE** double quotes `"column"` for reserved keywords
- **NEVER** use backticks (not supported in Presto/Trino)
- **USE** exact case for column names from schema
- **FOLLOW** Presto SQL syntax rules
---
## Full Load Tables
**ONLY these tables use FULL LOAD (Case 3):**
- `client_src.klaviyo_lists_histunion`
- `client_src.klaviyo_metric_data_histunion`
**All other tables use INCREMENTAL processing (Case 1 or 2)**
---
## File Generation Standards
### Standard Operations
| Operation | Files Required | MCP Calls | Tool Calls |
|-----------|----------------|-----------|------------|
| **New table** | SQL file + workflow update | 2 (inc + hist schemas) | Read + Write × 2 |
| **Multiple tables** | N SQL files + workflow update | 2N (schemas for each) | Read + Write × (N+1) |
| **Update workflow** | Workflow file only | 0 | Read + Edit × 1 |
---
## Quality Gates
Before delivering code, verify ALL gates pass:
| Gate | Requirement |
|------|-------------|
| **Schema Retrieved** | MCP tool used for both inc and hist |
| **Schema Compared** | Differences identified and documented |
| **Template Selected** | Correct Case (1, 2, or 3) chosen |
| **Columns Complete** | All inc table columns present |
| **Column Order** | Exact order from inc schema |
| **NULL Handling** | NULL added for missing hist columns |
| **Watermarks** | Both hist and inc updates present |
| **Parallel Config** | _parallel: true wrapper present |
| **No Schedule** | Schedule block removed |
| **Correct lkup_db** | client_config or user-specified |
**IF ANY GATE FAILS: Get schemas again and regenerate.**
---
## Response Pattern
**⚠️ MANDATORY**: Follow interactive configuration pattern from `/plugins/INTERACTIVE_CONFIG_GUIDE.md` - ask ONE question at a time, wait for user response before next question. See guide for complete list of required parameters.
When user requests hist-union workflow:
1. **Parse Input**:
```
Parsing table names from: {user_input}
- Database: {database}
- Base table: {base_name}
- Inc table: {inc_table}
- Hist table: {hist_table}
- Target: {target_table}
```
2. **Get Schemas via MCP**:
```
Retrieving schemas using MCP tool:
1. Getting schema for {inc_table}...
2. Getting schema for {hist_table}...
3. Comparing schemas...
```
3. **Determine Mode**:
```
Checking processing mode:
- Full load table? {yes/no}
- Schema differences: {list_differences}
- Template selected: Case {1/2/3}
```
4. **Generate Files**:
```
Creating files:
✅ hist_union/queries/{table_name}.sql
✅ hist_union/hist_union_runner.dig (updated)
```
5. **Verify and Report**:
```
Verification complete:
✅ All quality gates passed
✅ Schema validation successful
✅ Column handling correct
Next steps:
1. Review generated SQL files
2. Test workflow: td wf check hist_union/hist_union_runner.dig
3. Run workflow: td wf run hist_union/hist_union_runner.dig
```
---
## Error Prevention
### Common Mistakes to Avoid
❌ Guessing column names instead of using MCP tool
❌ Using hist table schema for CREATE TABLE
❌ Forgetting to add NULL for missing columns
❌ Using wrong template for full load tables
❌ Skipping schema comparison step
❌ Hardcoding column names instead of using exact schema
❌ Using backticks for reserved keywords
❌ Omitting watermark updates
❌ Forgetting _parallel: true wrapper
### Validation Checklist
Before delivering, ask yourself:
- [ ] Did I use MCP tool for both inc and hist schemas?
- [ ] Did I check if inc or hist table is missing?
- [ ] Did I add CREATE TABLE IF NOT EXISTS for missing tables?
- [ ] Did I compare the schemas to find differences?
- [ ] Did I check if this is a full load table?
- [ ] Did I use the correct SQL template?
- [ ] Are all inc table columns present in exact order?
- [ ] Did I add NULL for columns missing in hist?
- [ ] Are watermark updates present for both tables?
- [ ] Is _parallel: true configured in workflow?
- [ ] Is the lkup_db set correctly?
---
## Production-Ready Guarantee
By following these mandatory rules, you ensure:
- ✅ Accurate schema matching from live data
- ✅ Proper column handling for all cases
- ✅ Complete watermark tracking
- ✅ Efficient parallel processing
- ✅ Production-tested SQL templates
- ✅ Zero manual errors or assumptions
---
**Remember: Always use MCP tool for schemas. Check full load list first. Parse intelligently. Generate with exact templates. No exceptions.**
You are now ready to create production-ready hist-union workflows!

420
commands/histunion-batch.md Normal file
View File

@@ -0,0 +1,420 @@
---
name: histunion-batch
description: Create hist-union workflows for multiple tables in batch with parallel processing
---
# Create Batch Hist-Union Workflows
## ⚠️ CRITICAL: This command processes multiple tables efficiently with schema validation
I'll help you create hist-union workflows for multiple tables at once, with proper schema validation for each table.
---
## Required Information
### 1. Table List
Provide table names in any format (comma-separated or one per line):
**Option A - Base names:**
```
client_src.klaviyo_events, client_src.shopify_products, client_src.onetrust_profiles
```
**Option B - Hist names:**
```
client_src.klaviyo_events_hist
client_src.shopify_products_hist
client_src.onetrust_profiles_hist
```
**Option C - Mixed formats:**
```
client_src.klaviyo_events, client_src.shopify_products_hist, client_src.onetrust_profiles
```
**Option D - List format:**
```
- client_src.klaviyo_events
- client_src.shopify_products
- client_src.onetrust_profiles
```
### 2. Lookup Database (Optional)
- **Lookup/Config Database**: Database for inc_log watermark table
- **Default**: `client_config` (will be used if not specified)
---
## What I'll Do
### Step 1: Parse All Table Names
I will parse and normalize all table names:
```
For each table in list:
1. Extract database and base name
2. Remove _hist or _histunion suffix if present
3. Derive:
- Inc table: {database}.{base_name}
- Hist table: {database}.{base_name}_hist
- Target table: {database}.{base_name}_histunion
```
### Step 2: Get Schemas for All Tables via MCP Tool
**CRITICAL**: I will get exact schemas for EVERY table:
```
For each table:
1. Call mcp__mcc_treasuredata__describe_table for inc table
- Get complete column list
- Get exact column order
- Get data types
2. Call mcp__mcc_treasuredata__describe_table for hist table
- Get complete column list
- Get exact column order
- Get data types
3. Compare schemas:
- Document column differences
- Note any extra columns in inc vs hist
- Record exact column order
```
**Note**: This may require multiple MCP calls. I'll process them efficiently.
### Step 3: Check Full Load Status for Each Table
I will check each table against full load list:
```
For each table:
IF table_name IN ('klaviyo_lists', 'klaviyo_metric_data'):
template[table] = 'FULL_LOAD' # Case 3
ELSE:
IF inc_schema == hist_schema:
template[table] = 'IDENTICAL' # Case 1
ELSE:
template[table] = 'EXTRA_COLUMNS' # Case 2
```
### Step 4: Generate SQL Files for All Tables
I will create SQL file for each table in ONE response:
```
For each table, create: hist_union/queries/{base_name}.sql
With correct template based on schema analysis:
- Case 1: Identical schemas
- Case 2: Inc has extra columns
- Case 3: Full load
All files created in parallel using multiple Write tool calls
```
### Step 5: Update Digdag Workflow
I will update workflow with all tables:
```
File: hist_union/hist_union_runner.dig
Structure:
+hist_union_tasks:
_parallel: true
+{table1_name}_histunion:
td>: queries/{table1_name}.sql
+{table2_name}_histunion:
td>: queries/{table2_name}.sql
+{table3_name}_histunion:
td>: queries/{table3_name}.sql
... (all tables)
```
### Step 6: Verify Quality Gates for All Tables
Before delivering, I will verify for EACH table:
```
For each table:
✅ MCP tool used for both inc and hist schemas
✅ Schema differences identified
✅ Correct template selected
✅ All inc columns present in exact order
✅ NULL handling correct for missing columns
✅ Watermarks included for both hist and inc
✅ Parallel execution configured
```
---
## Batch Processing Strategy
### Efficient MCP Usage
```
1. Collect all table names first
2. Make MCP calls for all inc tables
3. Make MCP calls for all hist tables
4. Compare all schemas in batch
5. Generate all SQL files in ONE response
6. Update workflow once with all tasks
```
### Parallel File Generation
I will use multiple Write tool calls in a SINGLE response:
```
Single Response Contains:
- Write: hist_union/queries/table1.sql
- Write: hist_union/queries/table2.sql
- Write: hist_union/queries/table3.sql
- ... (all tables)
- Edit: hist_union/hist_union_runner.dig (add all tasks)
```
---
## Output
I will generate:
### For N Tables:
1. **hist_union/queries/{table1}.sql** - SQL for table 1
2. **hist_union/queries/{table2}.sql** - SQL for table 2
3. **hist_union/queries/{table3}.sql** - SQL for table 3
4. ... (one SQL file per table)
5. **hist_union/hist_union_runner.dig** - Updated workflow with all tables
### Workflow Structure:
```yaml
timezone: UTC
_export:
td:
database: {database}
lkup_db: {lkup_db}
+create_inc_log_table:
td>:
query: |
CREATE TABLE IF NOT EXISTS ${lkup_db}.inc_log (
table_name varchar,
project_name varchar,
inc_value bigint
)
+hist_union_tasks:
_parallel: true
+table1_histunion:
td>: queries/table1.sql
+table2_histunion:
td>: queries/table2.sql
+table3_histunion:
td>: queries/table3.sql
# ... all tables processed in parallel
```
---
## Progress Reporting
During processing, I will report:
### Phase 1: Parsing
```
Parsing table names...
✅ Found 5 tables to process:
1. client_src.klaviyo_events
2. client_src.shopify_products
3. client_src.onetrust_profiles
4. client_src.klaviyo_lists (FULL LOAD)
5. client_src.users
```
### Phase 2: Schema Retrieval
```
Retrieving schemas via MCP tool...
✅ Got schema for client_src.klaviyo_events (inc)
✅ Got schema for client_src.klaviyo_events_hist (hist)
✅ Got schema for client_src.shopify_products (inc)
✅ Got schema for client_src.shopify_products_hist (hist)
... (all tables)
```
### Phase 3: Schema Analysis
```
Analyzing schemas...
✅ Table 1: Identical schemas - Use Case 1
✅ Table 2: Inc has extra 'incremental_date' - Use Case 2
✅ Table 3: Identical schemas - Use Case 1
✅ Table 4: FULL LOAD - Use Case 3
✅ Table 5: Identical schemas - Use Case 1
```
### Phase 4: File Generation
```
Generating all files...
✅ Created hist_union/queries/klaviyo_events.sql
✅ Created hist_union/queries/shopify_products.sql
✅ Created hist_union/queries/onetrust_profiles.sql
✅ Created hist_union/queries/klaviyo_lists.sql (FULL LOAD)
✅ Created hist_union/queries/users.sql
✅ Updated hist_union/hist_union_runner.dig with 5 parallel tasks
```
---
## Special Handling
### Mixed Databases
If tables are from different databases:
```
✅ Supported - Each SQL file uses correct database
✅ Workflow uses primary database in _export
✅ Individual tasks can override if needed
```
### Full Load Tables in Batch
```
✅ Automatically detected (klaviyo_lists, klaviyo_metric_data)
✅ Uses Case 3 template (DROP + CREATE, no WHERE)
✅ Still updates watermarks
✅ Processed in parallel with other tables
```
### Schema Differences
```
✅ Each table analyzed independently
✅ NULL handling applied only where needed
✅ Exact column order maintained per table
✅ Template selection per table based on schema
```
---
## Performance Benefits
### Why Batch Processing?
-**Faster**: All files created in one response
-**Consistent**: Single workflow file with all tasks
-**Efficient**: Parallel MCP calls where possible
-**Complete**: All tables configured together
-**Parallel Execution**: All tasks run concurrently in Treasure Data
### Execution Efficiency
```
Sequential Processing:
Table 1: 10 min
Table 2: 10 min
Table 3: 10 min
Total: 30 minutes
Parallel Processing:
All tables: ~10 minutes (depending on slowest table)
```
---
## Next Steps After Generation
1. **Review All Generated Files**:
```bash
ls -la hist_union/queries/
cat hist_union/hist_union_runner.dig
```
2. **Verify Workflow Syntax**:
```bash
cd hist_union
td wf check hist_union_runner.dig
```
3. **Run Batch Workflow**:
```bash
td wf run hist_union_runner.dig
```
4. **Monitor Progress**:
```bash
td wf logs hist_union_runner.dig
```
5. **Verify All Results**:
```sql
-- Check watermarks for all tables
SELECT * FROM {lkup_db}.inc_log
WHERE project_name = 'hist_union'
ORDER BY table_name;
-- Check row counts for all histunion tables
SELECT
'{table1}_histunion' as table_name,
COUNT(*) as row_count
FROM {database}.{table1}_histunion
UNION ALL
SELECT
'{table2}_histunion',
COUNT(*)
FROM {database}.{table2}_histunion
-- ... (for all tables)
```
---
## Example
### Input
```
Create hist-union for these tables:
- client_src.klaviyo_events
- client_src.shopify_products_hist
- client_src.onetrust_profiles
- client_src.klaviyo_lists
```
### Output Summary
```
✅ Processed 4 tables:
1. klaviyo_events (Incremental - Case 1: Identical schemas)
- Inc: client_src.klaviyo_events
- Hist: client_src.klaviyo_events_hist
- Target: client_src.klaviyo_events_histunion
2. shopify_products (Incremental - Case 2: Inc has extra columns)
- Inc: client_src.shopify_products
- Hist: client_src.shopify_products_hist
- Target: client_src.shopify_products_histunion
- Extra columns in inc: incremental_date
3. onetrust_profiles (Incremental - Case 1: Identical schemas)
- Inc: client_src.onetrust_profiles
- Hist: client_src.onetrust_profiles_hist
- Target: client_src.onetrust_profiles_histunion
4. klaviyo_lists (FULL LOAD - Case 3)
- Inc: client_src.klaviyo_lists
- Hist: client_src.klaviyo_lists_hist
- Target: client_src.klaviyo_lists_histunion
Created 4 SQL files + 1 workflow file
All tasks configured for parallel execution
```
---
## Production-Ready Guarantee
All generated code will:
- ✅ Use exact schemas from MCP tool for every table
- ✅ Handle schema differences correctly per table
- ✅ Use correct template based on individual table analysis
- ✅ Process all tables in parallel for maximum efficiency
- ✅ Maintain exact column order per table
- ✅ Include proper NULL handling where needed
- ✅ Update watermarks for all tables
- ✅ Follow Presto/Trino SQL syntax
- ✅ Be production-tested and proven
---
**Ready to proceed? Please provide your list of tables and I'll generate complete hist-union workflows for all of them using exact schemas from MCP tool and production-tested templates.**

View File

@@ -0,0 +1,339 @@
---
name: histunion-create
description: Create hist-union workflow for combining historical and incremental table data
---
# Create Hist-Union Workflow
## ⚠️ CRITICAL: This command enforces strict schema validation and template adherence
I'll help you create a production-ready hist-union workflow to combine historical and incremental table data.
---
## Required Information
Please provide the following details:
### 1. Table Names
You can provide table names in any of these formats:
- **Base name**: `client_src.klaviyo_events` (I'll derive hist and histunion names)
- **Hist name**: `client_src.klaviyo_events_hist` (I'll derive inc and histunion names)
- **Explicit**: Inc: `client_src.klaviyo_events`, Hist: `client_src.klaviyo_events_hist`
### 2. Lookup Database (Optional)
- **Lookup/Config Database**: Database for inc_log watermark table
- **Default**: `client_config` (will be used if not specified)
---
## What I'll Do
### Step 1: Parse Table Names Intelligently
I will automatically derive all three table names:
```
From your input, I'll extract:
- Database name
- Base table name (removing _hist or _histunion if present)
- Inc table: {database}.{base_name}
- Hist table: {database}.{base_name}_hist
- Target table: {database}.{base_name}_histunion
```
### Step 2: Get Exact Schemas via MCP Tool (MANDATORY)
I will use MCP tool to get exact column information:
```
1. Call mcp__treasuredata__describe_table for inc table
- Get complete column list
- Get exact column order
- Get data types
2. Call mcp__treasuredata__describe_table for hist table
- Get complete column list
- Get exact column order
- Get data types
3. Compare schemas:
- Identify columns in inc but not in hist
- Identify any schema differences
- Document column order
```
### Step 3: Check Full Load Status
I will check if table requires full load processing:
```
IF table_name IN ('klaviyo_lists', 'klaviyo_metric_data'):
Use FULL LOAD template (Case 3)
- DROP TABLE and recreate
- Load ALL data (no WHERE clause)
- Still update watermarks
ELSE:
Use INCREMENTAL template (Case 1 or 2)
- CREATE TABLE IF NOT EXISTS
- Filter using inc_log watermarks
- Update watermarks after insert
```
### Step 4: Select Correct SQL Template
Based on schema comparison:
```
IF full_load_table:
Template = Case 3 (Full Load)
ELIF inc_schema == hist_schema:
Template = Case 1 (Identical schemas)
ELSE:
Template = Case 2 (Inc has extra columns)
```
### Step 5: Generate SQL File
I will create SQL file with exact schema:
```
File: hist_union/queries/{base_table_name}.sql
Structure:
- CREATE TABLE (or DROP + CREATE for full load)
- Use EXACT inc table schema
- Maintain exact column order
- INSERT INTO with UNION ALL:
- Historical SELECT
- Add NULL for columns missing in hist
- Use inc_log watermark (skip for full load)
- Incremental SELECT
- Use all columns in exact order
- Use inc_log watermark (skip for full load)
- UPDATE watermarks:
- Update hist table watermark
- Update inc table watermark
```
### Step 6: Create or Update Digdag Workflow
I will update the workflow file:
```
File: hist_union/hist_union_runner.dig
If file doesn't exist, create with:
- timezone: UTC
- _export section with database and lkup_db
- +create_inc_log_table task
- +hist_union_tasks section with _parallel: true
Add new task:
+hist_union_tasks:
_parallel: true
+{table_name}_histunion:
td>: queries/{table_name}.sql
```
### Step 7: Verify Quality Gates
Before delivering, I will verify:
```
✅ MCP tool used for both inc and hist table schemas
✅ Schema differences identified and documented
✅ Correct template selected (Case 1, 2, or 3)
✅ All inc table columns present in CREATE TABLE
✅ Exact column order maintained from inc schema
✅ NULL added for columns missing in hist table (if applicable)
✅ Watermark updates present for both hist and inc tables
✅ _parallel: true configured for concurrent execution
✅ No schedule block in workflow file
✅ Correct lkup_db set (client_config or user-specified)
```
---
## Output
I will generate:
### For Single Table:
1. **hist_union/queries/{table_name}.sql** - SQL for combining hist and inc data
2. **hist_union/hist_union_runner.dig** - Updated workflow file
### File Contents:
**SQL File Structure:**
```sql
-- CREATE TABLE using exact inc table schema
CREATE TABLE IF NOT EXISTS {database}.{table_name}_histunion (
-- All columns from inc table in exact order
...
);
-- INSERT with UNION ALL
INSERT INTO {database}.{table_name}_histunion
-- Historical data (with NULL for missing columns if needed)
SELECT ...
FROM {database}.{table_name}_hist
WHERE time > COALESCE((SELECT MAX(inc_value) FROM {lkup_db}.inc_log ...), 0)
UNION ALL
-- Incremental data
SELECT ...
FROM {database}.{table_name}
WHERE time > COALESCE((SELECT MAX(inc_value) FROM {lkup_db}.inc_log ...), 0);
-- Update watermarks
INSERT INTO {lkup_db}.inc_log ...
```
**Workflow File Structure:**
```yaml
timezone: UTC
_export:
td:
database: {database}
lkup_db: {lkup_db}
+create_inc_log_table:
td>:
query: |
CREATE TABLE IF NOT EXISTS ${lkup_db}.inc_log (...)
+hist_union_tasks:
_parallel: true
+{table_name}_histunion:
td>: queries/{table_name}.sql
```
---
## Special Cases
### Full Load Tables
For `klaviyo_lists` and `klaviyo_metric_data`:
```sql
-- DROP TABLE (fresh start each run)
DROP TABLE IF EXISTS {database}.{table_name}_histunion;
-- CREATE TABLE (no IF NOT EXISTS)
CREATE TABLE {database}.{table_name}_histunion (...);
-- INSERT with NO WHERE clause (load all data)
INSERT INTO {database}.{table_name}_histunion
SELECT ... FROM {database}.{table_name}_hist
UNION ALL
SELECT ... FROM {database}.{table_name};
-- Still update watermarks (for tracking)
INSERT INTO {lkup_db}.inc_log ...
```
### Schema Differences
When inc table has columns that hist table doesn't:
```sql
-- CREATE uses inc schema (includes all columns)
CREATE TABLE IF NOT EXISTS {database}.{table_name}_histunion (
incremental_date varchar, -- Extra column from inc
...other columns...
);
-- Hist SELECT adds NULL for missing columns
SELECT
NULL as incremental_date, -- NULL for missing column
...other columns...
FROM {database}.{table_name}_hist
UNION ALL
-- Inc SELECT uses all columns
SELECT
incremental_date, -- Actual value
...other columns...
FROM {database}.{table_name}
```
---
## Next Steps After Generation
1. **Review Generated Files**:
```bash
cat hist_union/queries/{table_name}.sql
cat hist_union/hist_union_runner.dig
```
2. **Verify SQL Syntax**:
```bash
cd hist_union
td wf check hist_union_runner.dig
```
3. **Run Workflow**:
```bash
td wf run hist_union_runner.dig
```
4. **Verify Results**:
```sql
-- Check row counts
SELECT COUNT(*) FROM {database}.{table_name}_histunion;
-- Check watermarks
SELECT * FROM {lkup_db}.inc_log
WHERE project_name = 'hist_union'
ORDER BY table_name;
-- Sample data
SELECT * FROM {database}.{table_name}_histunion
LIMIT 10;
```
---
## Examples
### Example 1: Simple Table Name
```
User: "Create hist-union for client_src.shopify_products"
I will derive:
- Inc: client_src.shopify_products
- Hist: client_src.shopify_products_hist
- Target: client_src.shopify_products_histunion
- Lookup DB: client_config (default)
```
### Example 2: Hist Table Name
```
User: "Add client_src.klaviyo_events_hist to hist_union"
I will derive:
- Inc: client_src.klaviyo_events
- Hist: client_src.klaviyo_events_hist
- Target: client_src.klaviyo_events_histunion
- Lookup DB: client_config (default)
```
### Example 3: Custom Lookup DB
```
User: "Create hist-union for mc_src.users with lookup db mc_config"
I will use:
- Inc: mc_src.users
- Hist: mc_src.users_hist
- Target: mc_src.users_histunion
- Lookup DB: mc_config (user-specified)
```
---
## Production-Ready Guarantee
All generated code will:
- ✅ Use exact schemas from MCP tool (no guessing)
- ✅ Handle schema differences correctly
- ✅ Use correct template based on full load check
- ✅ Maintain exact column order
- ✅ Include proper NULL handling
- ✅ Update watermarks correctly
- ✅ Use parallel execution for efficiency
- ✅ Follow Presto/Trino SQL syntax
- ✅ Be production-tested and proven
---
**Ready to proceed? Please provide the table name(s) and I'll generate your complete hist-union workflow using exact schemas from MCP tool and production-tested templates.**

View File

@@ -0,0 +1,381 @@
---
name: histunion-validate
description: Validate hist-union workflow and SQL files against production quality gates
---
# Validate Hist-Union Workflows
## ⚠️ CRITICAL: This command validates all hist-union files against production quality gates
I'll help you validate your hist-union workflow files to ensure they meet production standards.
---
## What Gets Validated
### 1. Workflow File Structure
**File**: `hist_union/hist_union_runner.dig`
Checks:
- ✅ Valid YAML syntax
- ✅ Required sections present (timezone, _export, tasks)
- ✅ inc_log table creation task exists
- ✅ hist_union_tasks section present
-`_parallel: true` configured for concurrent execution
- ✅ No schedule block (schedules should be external)
- ✅ Correct lkup_db variable usage
- ✅ All SQL files referenced exist
### 2. SQL File Structure
**Files**: `hist_union/queries/*.sql`
For each SQL file, checks:
- ✅ Valid SQL syntax (Presto/Trino compatible)
- ✅ CREATE TABLE statement present
- ✅ INSERT INTO with UNION ALL structure
- ✅ Watermark filtering using inc_log (for incremental tables)
- ✅ Watermark updates for both hist and inc tables
- ✅ Correct project_name = 'hist_union' in watermark updates
- ✅ No backticks (use double quotes for reserved keywords)
- ✅ Consistent table naming (inc, hist, histunion)
### 3. Schema Validation
**Requires MCP access to Treasure Data**
For each table pair, checks:
- ✅ Inc table exists and is accessible
- ✅ Hist table exists and is accessible
- ✅ CREATE TABLE columns match inc table schema
- ✅ Column order matches inc table schema
- ✅ NULL handling for columns missing in hist table
- ✅ All inc table columns present in SQL
- ✅ UNION ALL column counts match
### 4. Template Compliance
Checks against template requirements:
- ✅ Full load tables use correct template (DROP + no WHERE)
- ✅ Incremental tables use correct template (CREATE IF NOT EXISTS + WHERE)
- ✅ Watermark updates present for both tables
- ✅ COALESCE used for watermark defaults
- ✅ Correct table name variables used
---
## Validation Modes
### Mode 1: Syntax Validation (Fast)
**No MCP required** - Validates file structure and SQL syntax only
```bash
Use when: Quick syntax check without database access
Checks: File structure, YAML syntax, SQL syntax, basic patterns
Duration: ~10 seconds
```
### Mode 2: Schema Validation (Complete)
**Requires MCP** - Validates against actual table schemas
```bash
Use when: Pre-deployment validation, full compliance check
Checks: Everything in Mode 1 + schema matching, column validation
Duration: ~30-60 seconds (depends on table count)
```
---
## What I'll Do
### Step 1: Scan Files
```
Scanning hist_union directory...
✅ Found workflow file: hist_union_runner.dig
✅ Found N SQL files in queries/
```
### Step 2: Validate Workflow File
```
Validating hist_union_runner.dig...
✅ YAML syntax valid
✅ timezone set to UTC
✅ _export section present with td.database and lkup_db
✅ +create_inc_log_table task present
✅ +hist_union_tasks section present
✅ _parallel: true configured
✅ No schedule block found
✅ All referenced SQL files exist
```
### Step 3: Validate Each SQL File
```
Validating hist_union/queries/klaviyo_events.sql...
✅ SQL syntax valid (Presto/Trino compatible)
✅ CREATE TABLE statement found
✅ Table name: client_src.klaviyo_events_histunion
✅ INSERT INTO with UNION ALL structure found
✅ Watermark filtering present for hist table
✅ Watermark filtering present for inc table
✅ Watermark update for hist table found
✅ Watermark update for inc table found
✅ project_name = 'hist_union' verified
✅ No backticks found (using double quotes)
```
### Step 4: Schema Validation (Mode 2 Only)
```
Validating schemas via MCP tool...
Table: klaviyo_events
✅ Inc table exists: client_src.klaviyo_events
✅ Hist table exists: client_src.klaviyo_events_hist
✅ Retrieved inc schema: 45 columns
✅ Retrieved hist schema: 44 columns
✅ Schema difference: inc has 'incremental_date', hist does not
✅ CREATE TABLE matches inc schema (45 columns)
✅ Column order matches inc schema
✅ NULL handling present for 'incremental_date' in hist SELECT
✅ All 45 inc columns present in SQL
✅ UNION ALL column counts match (45 = 45)
```
### Step 5: Template Compliance Check
```
Checking template compliance...
Table: klaviyo_lists
⚠️ Full load table detected
✅ Uses Case 3 template (DROP TABLE + no WHERE clause)
✅ Watermarks still updated
Table: klaviyo_events
✅ Incremental table
✅ Uses Case 2 template (inc has extra columns)
✅ CREATE TABLE IF NOT EXISTS used
✅ WHERE clauses present for watermark filtering
✅ COALESCE with default value 0
```
### Step 6: Generate Validation Report
```
Generating validation report...
✅ Report created with all findings
✅ Errors highlighted (if any)
✅ Warnings noted (if any)
✅ Recommendations provided (if any)
```
---
## Validation Report Format
### Summary Section
```
═══════════════════════════════════════════════════════════
HIST-UNION VALIDATION REPORT
═══════════════════════════════════════════════════════════
Validation Mode: [Syntax Only / Full Schema Validation]
Timestamp: 2024-10-13 14:30:00 UTC
Workflow File: hist_union/hist_union_runner.dig
SQL Files: 5
Overall Status: ✅ PASSED / ❌ FAILED / ⚠️ WARNINGS
```
### Detailed Results
```
───────────────────────────────────────────────────────────
WORKFLOW FILE: hist_union_runner.dig
───────────────────────────────────────────────────────────
✅ YAML Syntax: Valid
✅ Structure: Complete (all required sections present)
✅ Parallel Execution: Configured (_parallel: true)
✅ inc_log Task: Present
✅ Schedule: None (correct)
✅ SQL References: All 5 files exist
───────────────────────────────────────────────────────────
SQL FILE: queries/klaviyo_events.sql
───────────────────────────────────────────────────────────
✅ SQL Syntax: Valid (Presto/Trino)
✅ Template: Case 2 (Inc has extra columns)
✅ Table: client_src.klaviyo_events_histunion
✅ CREATE TABLE: Present
✅ UNION ALL: Correct structure
✅ Watermarks: Both hist and inc updates present
✅ NULL Handling: Correct for 'incremental_date'
✅ Schema Match: All 45 columns present in correct order
───────────────────────────────────────────────────────────
SQL FILE: queries/klaviyo_lists.sql
───────────────────────────────────────────────────────────
✅ SQL Syntax: Valid (Presto/Trino)
✅ Template: Case 3 (Full load)
⚠️ Table Type: FULL LOAD table
✅ DROP TABLE: Present
✅ CREATE TABLE: Correct (no IF NOT EXISTS)
✅ WHERE Clauses: Absent (correct for full load)
✅ UNION ALL: Correct structure
✅ Watermarks: Both hist and inc updates present
✅ Schema Match: All 52 columns present in correct order
... (for all SQL files)
```
### Issues Section (if any)
```
───────────────────────────────────────────────────────────
ISSUES FOUND
───────────────────────────────────────────────────────────
❌ ERROR: queries/shopify_products.sql
- Line 15: Column 'incremental_date' missing in CREATE TABLE
- Expected: 'incremental_date varchar' based on inc table schema
- Fix: Add 'incremental_date varchar' to CREATE TABLE statement
❌ ERROR: queries/users.sql
- Line 45: Using backticks around column "index"
- Fix: Replace `index` with "index" (Presto/Trino requires double quotes)
⚠️ WARNING: hist_union_runner.dig
- Line 25: Task +shopify_variants_histunion references non-existent SQL file
- Expected: queries/shopify_variants.sql
- Fix: Create missing SQL file or remove task
⚠️ WARNING: queries/onetrust_profiles.sql
- Missing watermark update for hist table
- Should have: INSERT INTO inc_log for onetrust_profiles_hist
- Fix: Add watermark update after UNION ALL insert
```
### Recommendations Section
```
───────────────────────────────────────────────────────────
RECOMMENDATIONS
───────────────────────────────────────────────────────────
💡 Consider adding these improvements:
1. Add comments to SQL files explaining schema differences
2. Document which tables are full load vs incremental
3. Add error handling tasks in workflow
4. Consider adding validation queries after inserts
💡 Performance optimizations:
1. Review parallel task limit based on TD account
2. Consider partitioning very large tables
3. Review watermark index on inc_log table
```
---
## Error Categories
### Critical Errors (Must Fix)
- ❌ Invalid YAML syntax in workflow
- ❌ Invalid SQL syntax
- ❌ Missing required sections (CREATE, INSERT, watermarks)
- ❌ Column count mismatch in UNION ALL
- ❌ Schema mismatch with inc table
- ❌ Referenced SQL files don't exist
- ❌ Inc or hist table doesn't exist in TD
### Warnings (Should Fix)
- ⚠️ Using backticks instead of double quotes
- ⚠️ Missing NULL handling for extra columns
- ⚠️ Wrong template for full load tables
- ⚠️ Watermark updates incomplete
- ⚠️ Column order doesn't match schema
### Info (Nice to Have)
- Could add more comments
- Could optimize query structure
- Could add data validation queries
---
## Usage Examples
### Example 1: Quick Syntax Check
```
User: "Validate my hist-union files"
I will:
1. Scan hist_union directory
2. Validate workflow YAML syntax
3. Validate all SQL file syntax
4. Check file references
5. Generate report with findings
```
### Example 2: Full Validation with Schema Check
```
User: "Validate hist-union files with full schema check"
I will:
1. Scan hist_union directory
2. Validate workflow and SQL syntax
3. Use MCP tool to get all table schemas
4. Compare CREATE TABLE with actual schemas
5. Verify column order and NULL handling
6. Check template compliance
7. Generate comprehensive report
```
### Example 3: Validate Specific File
```
User: "Validate just the klaviyo_events.sql file"
I will:
1. Read queries/klaviyo_events.sql
2. Validate SQL syntax
3. Check template structure
4. Optionally get schema via MCP
5. Generate focused report for this file
```
---
## Next Steps After Validation
### If Validation Passes
```bash
✅ All checks passed!
Next steps:
1. Deploy to Treasure Data: td wf push hist_union
2. Run workflow: td wf run hist_union_runner.dig
3. Monitor execution: td wf logs hist_union_runner.dig
4. Verify results in target tables
```
### If Validation Fails
```bash
❌ Validation found N errors and M warnings
Next steps:
1. Review validation report for details
2. Fix all critical errors ()
3. Address warnings (⚠️ ) if possible
4. Re-run validation
5. Deploy only after all errors are resolved
```
---
## Production-Ready Checklist
Before deploying, ensure:
- [ ] Workflow file YAML syntax is valid
- [ ] All SQL files have valid Presto/Trino syntax
- [ ] All referenced SQL files exist
- [ ] inc_log table creation task present
- [ ] Parallel execution configured
- [ ] No schedule blocks in workflow
- [ ] All CREATE TABLE statements match inc schemas
- [ ] Column order matches inc table schemas
- [ ] NULL handling present for schema differences
- [ ] Watermark updates present for all tables
- [ ] Full load tables use correct template
- [ ] No backticks in SQL (use double quotes)
- [ ] All table references are correct
---
**Ready to validate? Specify validation mode (syntax-only or full-schema) and I'll run comprehensive validation against all production quality gates.**

57
plugin.lock.json Normal file
View File

@@ -0,0 +1,57 @@
{
"$schema": "internal://schemas/plugin.lock.v1.json",
"pluginId": "gh:treasure-data/aps_claude_tools:plugins/cdp-histunion",
"normalized": {
"repo": null,
"ref": "refs/tags/v20251128.0",
"commit": "5ae10a994494d70f62a02be6ef0659a25067e058",
"treeHash": "ed94ed4c2dbc1b4bb47043e33502024e604b03c73fe756da0ad0cbe95fd8e4be",
"generatedAt": "2025-11-28T10:28:44.490600Z",
"toolVersion": "publish_plugins.py@0.2.0"
},
"origin": {
"remote": "git@github.com:zhongweili/42plugin-data.git",
"branch": "master",
"commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390",
"repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data"
},
"manifest": {
"name": "cdp-histunion",
"description": "Combine historical and incremental table data with schema validation and watermark tracking",
"version": null
},
"content": {
"files": [
{
"path": "README.md",
"sha256": "79d831819c2031ac485af7f653deff176a96f4dafa366a88e09037b7058e8ebd"
},
{
"path": "agents/cdp-histunion-expert.md",
"sha256": "a31528fe19f7709ec35c734e98da736046315aba65f4afb3a2dcb300b31fc448"
},
{
"path": ".claude-plugin/plugin.json",
"sha256": "9f2f6b5eb81cd3fceba234455d49869caf83aca86e341bfd57eeaaffa9dcf607"
},
{
"path": "commands/histunion-validate.md",
"sha256": "6c277a0b43b199f5c4edfc1f3781d8c813cfe38382465832bac19a37bcace57a"
},
{
"path": "commands/histunion-batch.md",
"sha256": "7f80feba16d769bb98fcddc1c6472aad97f0ad9e7247053ef6705272e7f53a4e"
},
{
"path": "commands/histunion-create.md",
"sha256": "9aade32c16ed4002a5e13fe7cc2967af5951c62bcb386d858545d1cbd0f8babf"
}
],
"dirSha256": "ed94ed4c2dbc1b4bb47043e33502024e604b03c73fe756da0ad0cbe95fd8e4be"
},
"security": {
"scannedAt": null,
"scannerVersion": null,
"flags": []
}
}