Files
gh-treasure-data-aps-claude…/commands/hybrid-execute-databricks.md
2025-11-30 09:02:39 +08:00

9.5 KiB

name, description
name description
hybrid-execute-databricks Execute Databricks ID unification workflow with convergence detection and monitoring

Execute Databricks ID Unification Workflow

Overview

Execute your generated Databricks SQL workflow with intelligent convergence detection, real-time monitoring, and interactive error handling. This command orchestrates the complete unification process from graph creation to master table generation.


What You Need

Required Inputs

  1. SQL Directory: Path to generated SQL files (e.g., databricks_sql/unify/)
  2. Server Hostname: Your Databricks workspace URL (e.g., your-workspace.cloud.databricks.com)
  3. HTTP Path: SQL Warehouse or cluster path (e.g., /sql/1.0/warehouses/abc123)
  4. Catalog: Target catalog name
  5. Schema: Target schema name

Authentication

Option 1: Personal Access Token (PAT)

  • Access token from Databricks workspace
  • Can be provided as argument or via environment variable DATABRICKS_TOKEN

Option 2: OAuth

  • Browser-based authentication
  • No token required, will open browser for login

What I'll Do

Step 1: Connection Setup

  • Connect to your Databricks workspace
  • Validate credentials and permissions
  • Set catalog and schema context
  • Verify SQL directory exists

Step 2: Execution Plan

Display execution plan with:

  • All SQL files in execution order
  • File types (Setup, Loop Iteration, Enrichment, Master Table, etc.)
  • Estimated steps and dependencies

Step 3: SQL Execution

I'll call the databricks-workflow-executor agent to:

  • Execute SQL files in proper sequence
  • Skip loop iteration files (handled separately)
  • Monitor progress with real-time feedback
  • Track row counts and execution times

Step 4: Unify Loop with Convergence Detection

Intelligent Loop Execution:

Iteration 1:
  ✓ Execute unify SQL
  • Check convergence: 1500 records updated
  • Optimize Delta table
  → Continue to iteration 2

Iteration 2:
  ✓ Execute unify SQL
  • Check convergence: 450 records updated
  • Optimize Delta table
  → Continue to iteration 3

Iteration 3:
  ✓ Execute unify SQL
  • Check convergence: 0 records updated
  ✓ CONVERGED! Stop loop

Features:

  • Runs until convergence (updated_count = 0)
  • Maximum 30 iterations safety limit
  • Auto-optimization after each iteration
  • Creates alias table (loop_final) for downstream processing

Step 5: Post-Loop Processing

  • Execute canonicalization step
  • Generate result statistics
  • Enrich source tables with canonical IDs
  • Create master tables
  • Generate metadata and lookup tables

Step 6: Final Report

Provide:

  • Total execution time
  • Files processed successfully
  • Convergence statistics
  • Final table row counts
  • Next steps and recommendations

Command Usage

/cdp-hybrid-idu:hybrid-execute-databricks

I'll prompt you for:
- SQL directory path
- Databricks server hostname
- HTTP path
- Catalog and schema
- Authentication method

Advanced Mode

Provide all parameters upfront:

SQL directory: databricks_sql/unify/
Server hostname: your-workspace.cloud.databricks.com
HTTP path: /sql/1.0/warehouses/abc123
Catalog: my_catalog
Schema: my_schema
Auth type: pat (or oauth)
Access token: dapi... (if using PAT)

Execution Features

1. Convergence Detection

Algorithm:

SELECT COUNT(*) as updated_count FROM (
    SELECT leader_ns, leader_id, follower_ns, follower_id
    FROM current_iteration
    EXCEPT
    SELECT leader_ns, leader_id, follower_ns, follower_id
    FROM previous_iteration
) diff

Stops when: updated_count = 0

2. Delta Table Optimization

After major operations:

OPTIMIZE table_name

Benefits:

  • Compacts small files
  • Improves query performance
  • Reduces storage costs
  • Optimizes clustering

3. Interactive Error Handling

If an error occurs:

✗ File: 04_unify_loop_iteration_01.sql
Error: Table not found: source_table

Continue with remaining files? (y/n):

You can choose to:

  • Continue: Skip failed file, continue with rest
  • Stop: Halt execution for investigation

4. Real-Time Monitoring

Track progress with:

  • ✓ Completed steps (green)
  • • Progress indicators (cyan)
  • ✗ Failed steps (red)
  • ⚠ Warnings (yellow)
  • Row counts and execution times

5. Alias Table Creation

After convergence, creates:

CREATE OR REPLACE TABLE catalog.schema.unified_id_graph_unify_loop_final
AS SELECT * FROM catalog.schema.unified_id_graph_unify_loop_3

This allows downstream SQL to reference loop_final regardless of actual iteration count.


Technical Details

Python Script Execution

The agent executes:

python3 scripts/databricks/databricks_sql_executor.py \
    databricks_sql/unify/ \
    --server-hostname your-workspace.databricks.com \
    --http-path /sql/1.0/warehouses/abc123 \
    --catalog my_catalog \
    --schema my_schema \
    --auth-type pat \
    --optimize-tables

Execution Order

  1. Setup Phase (01-03):

    • Create graph table (loop_0)
    • Extract and merge identities
    • Generate source statistics
  2. Unification Loop (04):

    • Run iterations until convergence
    • Check after EVERY iteration
    • Stop when updated_count = 0
    • Create loop_final alias
  3. Canonicalization (05):

    • Create canonical ID lookup
    • Create keys and tables metadata
    • Rename final graph table
  4. Statistics (06):

    • Generate result key statistics
    • Create histograms
    • Calculate coverage metrics
  5. Enrichment (10-19):

    • Add canonical IDs to source tables
    • Create enriched_* tables
  6. Master Tables (20-29):

    • Aggregate attributes
    • Apply priority rules
    • Create unified customer profiles
  7. Metadata (30-39):

    • Unification metadata
    • Filter lookup tables
    • Column lookup tables

Connection Management

  • Establishes single connection for entire workflow
  • Uses connection pooling for efficiency
  • Automatic reconnection on timeout
  • Proper cleanup on completion or error

Example Execution

Input

SQL directory: databricks_sql/unify/
Server hostname: dbc-12345-abc.cloud.databricks.com
HTTP path: /sql/1.0/warehouses/6789abcd
Catalog: customer_data
Schema: id_unification
Auth type: pat

Output

✓ Connected to Databricks: dbc-12345-abc.cloud.databricks.com
• Using catalog: customer_data, schema: id_unification

Starting Databricks SQL Execution
• Catalog: customer_data
• Schema: id_unification
• Delta tables: ✓ enabled

Executing: 01_create_graph.sql
✓ 01_create_graph.sql: Executed successfully

Executing: 02_extract_merge.sql
✓ 02_extract_merge.sql: Executed successfully
• Rows affected: 125000

Executing: 03_source_key_stats.sql
✓ 03_source_key_stats.sql: Executed successfully

Executing Unify Loop Before Canonicalization

--- Iteration 1 ---
✓ Iteration 1 completed
• Rows processed: 125000
• Updated records: 1500
• Optimizing Delta table

--- Iteration 2 ---
✓ Iteration 2 completed
• Rows processed: 125000
• Updated records: 450
• Optimizing Delta table

--- Iteration 3 ---
✓ Iteration 3 completed
• Rows processed: 125000
• Updated records: 0
✓ Loop converged after 3 iterations

• Creating alias table for final iteration
✓ Alias table 'unified_id_graph_unify_loop_final' created

Executing: 05_canonicalize.sql
✓ 05_canonicalize.sql: Executed successfully

[... continues with enrichment, master tables, metadata ...]

Execution Complete
• Files processed: 18/18
• Final unified_id_lookup rows: 98,500

• Disconnected from Databricks

Monitoring and Troubleshooting

Check Execution Progress

During execution, you can monitor:

  • Databricks SQL Warehouse query history
  • Delta table sizes and row counts
  • Execution logs in Databricks workspace

Common Issues

Issue: Connection timeout Solution: Check network access, verify credentials, ensure SQL Warehouse is running

Issue: Table not found Solution: Verify catalog/schema permissions, check source table names in YAML

Issue: Loop doesn't converge Solution: Check data quality, increase max_iterations, review key validation rules

Issue: Out of memory Solution: Increase SQL Warehouse size, optimize clustering, reduce batch sizes

Issue: Permission denied Solution: Verify catalog/schema permissions, check Unity Catalog access controls

Performance Optimization

  • Use larger SQL Warehouse for faster execution
  • Enable auto-scaling for variable workloads
  • Optimize Delta tables regularly
  • Use clustering on frequently joined columns

Post-Execution Validation

DO NOT RUN THESE VALIDATION. JUST PRESENT TO USER TO RUN ON DATABRICKS

Check Coverage

SELECT
    COUNT(*) as total_records,
    COUNT(unified_id) as records_with_id,
    COUNT(unified_id) * 100.0 / COUNT(*) as coverage_percent
FROM catalog.schema.enriched_customer_profiles;

Verify Master Table

SELECT COUNT(*) as unified_customers
FROM catalog.schema.customer_master;

Review Statistics

SELECT * FROM catalog.schema.unified_id_result_key_stats
WHERE from_table = '*';

Success Criteria

Execution successful when:

  • All SQL files processed without critical errors
  • Unification loop converged (updated_count = 0)
  • Canonical IDs generated for all eligible records
  • Enriched tables created successfully
  • Master tables populated with attributes
  • Coverage metrics meet expectations

Ready to execute your Databricks ID unification workflow?

Provide your SQL directory path and Databricks connection details to begin!