Files
gh-treasure-data-aps-claude…/commands/hybrid-execute-snowflake.md
2025-11-30 09:02:39 +08:00

402 lines
9.6 KiB
Markdown

---
name: hybrid-execute-snowflake
description: Execute Snowflake ID unification workflow with convergence detection and monitoring
---
# Execute Snowflake ID Unification Workflow
## Overview
Execute your generated Snowflake SQL workflow with intelligent convergence detection, real-time monitoring, and interactive error handling. This command orchestrates the complete unification process from graph creation to master table generation.
---
## What You Need
### Required Inputs
1. **SQL Directory**: Path to generated SQL files (e.g., `snowflake_sql/unify/`)
2. **Account**: Snowflake account name (e.g., `myaccount` from `myaccount.snowflakecomputing.com`)
3. **User**: Snowflake username
4. **Database**: Target database name
5. **Schema**: Target schema name
6. **Warehouse**: Compute warehouse name (defaults to `COMPUTE_WH`)
### Authentication
**Option 1: Password**
- Can be provided as argument or via environment variable `SNOWFLAKE_PASSWORD` via environment file (.env) `SNOWFLAKE_PASSWORD`
- Will prompt if not provided
**Option 2: SSO (externalbrowser)**
- Opens browser for authentication
- No password required
**Option 3: Key-Pair**
- Private key path via `SNOWFLAKE_PRIVATE_KEY_PATH`
- Passphrase via `SNOWFLAKE_PRIVATE_KEY_PASSPHRASE`
---
## What I'll Do
### Step 1: Connection Setup
- Connect to your Snowflake account
- Validate credentials and permissions
- Set database and schema context
- Verify SQL directory exists
- Activate warehouse
### Step 2: Execution Plan
Display execution plan with:
- All SQL files in execution order
- File types (Setup, Loop Iteration, Enrichment, Master Table, etc.)
- Estimated steps and dependencies
### Step 3: SQL Execution
I'll call the **snowflake-workflow-executor agent** to:
- Execute SQL files in proper sequence
- Skip loop iteration files (handled separately)
- Monitor progress with real-time feedback
- Track row counts and execution times
### Step 4: Unify Loop with Convergence Detection
**Intelligent Loop Execution**:
```
Iteration 1:
✓ Execute unify SQL
• Check convergence: 1500 records updated
→ Continue to iteration 2
Iteration 2:
✓ Execute unify SQL
• Check convergence: 450 records updated
→ Continue to iteration 3
Iteration 3:
✓ Execute unify SQL
• Check convergence: 0 records updated
✓ CONVERGED! Stop loop
```
**Features**:
- Runs until convergence (updated_count = 0)
- Maximum 30 iterations safety limit
- Creates alias table (loop_final) for downstream processing
### Step 5: Post-Loop Processing
- Execute canonicalization step
- Generate result statistics
- Enrich source tables with canonical IDs
- Create master tables
- Generate metadata and lookup tables
### Step 6: Final Report
Provide:
- Total execution time
- Files processed successfully
- Convergence statistics
- Final table row counts
- Next steps and recommendations
---
## Command Usage
### Interactive Mode (Recommended)
```
/cdp-hybrid-idu:hybrid-execute-snowflake
I'll prompt you for:
- SQL directory path
- Snowflake account name
- Username
- Database and schema
- Warehouse name
- Authentication method
```
### Advanced Mode
Provide all parameters upfront:
```
SQL directory: snowflake_sql/unify/
Account: myaccount
User: myuser
Database: my_database
Schema: my_schema
Warehouse: COMPUTE_WH
Password: (will prompt if not in environment)
```
---
## Execution Features
### 1. Convergence Detection
**Algorithm**:
```sql
SELECT COUNT(*) as updated_count FROM (
SELECT leader_ns, leader_id, follower_ns, follower_id
FROM current_iteration
EXCEPT
SELECT leader_ns, leader_id, follower_ns, follower_id
FROM previous_iteration
) diff
```
**Stops when**: updated_count = 0
### 2. Interactive Error Handling
If an error occurs:
```
✗ File: 04_unify_loop_iteration_01.sql
Error: Table not found: source_table
Continue with remaining files? (y/n):
```
You can choose to:
- Continue: Skip failed file, continue with rest
- Stop: Halt execution for investigation
### 3. Real-Time Monitoring
Track progress with:
- ✓ Completed steps (green)
- • Progress indicators (cyan)
- ✗ Failed steps (red)
- ⚠ Warnings (yellow)
- Row counts and execution times
### 4. Alias Table Creation
After convergence, creates:
```sql
CREATE OR REPLACE TABLE database.schema.unified_id_graph_unify_loop_final
AS SELECT * FROM database.schema.unified_id_graph_unify_loop_3
```
This allows downstream SQL to reference `loop_final` regardless of actual iteration count.
---
## Technical Details
### Python Script Execution
The agent executes:
```bash
python3 scripts/snowflake/snowflake_sql_executor.py \
snowflake_sql/unify/ \
--account myaccount \
--user myuser \
--database my_database \
--schema my_schema \
--warehouse COMPUTE_WH
```
### Execution Order
1. **Setup Phase** (01-03):
- Create graph table (loop_0)
- Extract and merge identities
- Generate source statistics
2. **Unification Loop** (04):
- Run iterations until convergence
- Check after EVERY iteration
- Stop when updated_count = 0
- Create loop_final alias
3. **Canonicalization** (05):
- Create canonical ID lookup
- Create keys and tables metadata
- Rename final graph table
4. **Statistics** (06):
- Generate result key statistics
- Create histograms
- Calculate coverage metrics
5. **Enrichment** (10-19):
- Add canonical IDs to source tables
- Create enriched_* tables
6. **Master Tables** (20-29):
- Aggregate attributes
- Apply priority rules
- Create unified customer profiles
7. **Metadata** (30-39):
- Unification metadata
- Filter lookup tables
- Column lookup tables
### Connection Management
- Establishes single connection for entire workflow
- Uses connection pooling for efficiency
- Automatic reconnection on timeout
- Proper cleanup on completion or error
---
## Example Execution
### Input
```
SQL directory: snowflake_sql/unify/
Account: myorg-myaccount
User: analytics_user
Database: customer_data
Schema: id_unification
Warehouse: LARGE_WH
```
### Output
```
✓ Connected to Snowflake: myorg-myaccount
• Using database: customer_data, schema: id_unification
Starting Snowflake SQL Execution
• Database: customer_data
• Schema: id_unification
Executing: 01_create_graph.sql
✓ 01_create_graph.sql: Executed successfully
Executing: 02_extract_merge.sql
✓ 02_extract_merge.sql: Executed successfully
• Rows affected: 125000
Executing: 03_source_key_stats.sql
✓ 03_source_key_stats.sql: Executed successfully
Executing Unify Loop Before Canonicalization
--- Iteration 1 ---
✓ Iteration 1 completed
• Rows processed: 125000
• Updated records: 1500
--- Iteration 2 ---
✓ Iteration 2 completed
• Rows processed: 125000
• Updated records: 450
--- Iteration 3 ---
✓ Iteration 3 completed
• Rows processed: 125000
• Updated records: 0
✓ Loop converged after 3 iterations
• Creating alias table for final iteration
✓ Alias table 'unified_id_graph_unify_loop_final' created
Executing: 05_canonicalize.sql
✓ 05_canonicalize.sql: Executed successfully
[... continues with enrichment, master tables, metadata ...]
Execution Complete
• Files processed: 18/18
• Final unified_id_lookup rows: 98,500
• Disconnected from Snowflake
```
---
## Monitoring and Troubleshooting
### Check Execution Progress
During execution, you can monitor:
- Snowflake query history
- Table sizes and row counts
- Warehouse utilization
- Execution logs
### Common Issues
**Issue**: Connection timeout
**Solution**: Check network access, verify credentials, ensure warehouse is running
**Issue**: Table not found
**Solution**: Verify database/schema permissions, check source table names in YAML
**Issue**: Loop doesn't converge
**Solution**: Check data quality, increase max_iterations, review key validation rules
**Issue**: Warehouse suspended
**Solution**: Ensure auto-resume is enabled, manually resume warehouse if needed
**Issue**: Permission denied
**Solution**: Verify database/schema permissions, check role assignments
### Performance Optimization
- Use larger warehouse for faster execution (L, XL, 2XL, etc.)
- Enable multi-cluster warehouse for concurrency
- Use clustering keys on frequently joined columns
- Monitor query profiles for optimization opportunities
---
## Post-Execution Validation
**DO NOT RUN THESE VALIDATION. JUST PRESENT TO USER TO RUN ON SNOWFLAKE**
### Check Coverage
```sql
SELECT
COUNT(*) as total_records,
COUNT(unified_id) as records_with_id,
COUNT(unified_id) * 100.0 / COUNT(*) as coverage_percent
FROM database.schema.enriched_customer_profiles;
```
### Verify Master Table
```sql
SELECT COUNT(*) as unified_customers
FROM database.schema.customer_master;
```
### Review Statistics
```sql
SELECT * FROM database.schema.unified_id_result_key_stats
WHERE from_table = '*';
```
---
## Success Criteria
Execution successful when:
- ✅ All SQL files processed without critical errors
- ✅ Unification loop converged (updated_count = 0)
- ✅ Canonical IDs generated for all eligible records
- ✅ Enriched tables created successfully
- ✅ Master tables populated with attributes
- ✅ Coverage metrics meet expectations
---
## Authentication Examples
### Using Password
```bash
export SNOWFLAKE_PASSWORD='your_password'
/cdp-hybrid-idu:hybrid-execute-snowflake
```
### Using SSO
```bash
/cdp-hybrid-idu:hybrid-execute-snowflake
# Will prompt: Use SSO authentication? (y/n): y
# Opens browser for authentication
```
### Using Key-Pair
```bash
export SNOWFLAKE_PRIVATE_KEY_PATH='/path/to/key.p8'
export SNOWFLAKE_PRIVATE_KEY_PASSPHRASE='passphrase'
/cdp-hybrid-idu:hybrid-execute-snowflake
```
---
**Ready to execute your Snowflake ID unification workflow?**
Provide your SQL directory path and Snowflake connection details to begin!