commit 06380ad6e865888964ed5a6a42bc236ece8a80db Author: Zhongwei Li Date: Sun Nov 30 09:02:44 2025 +0800 Initial commit diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..a546a25 --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,15 @@ +{ + "name": "cdp-orchestrator", + "description": "End-to-end CDP implementation orchestrator with automated workflow generation, deployment, execution, and monitoring. Orchestrates Ingestion → Hist-Union → Staging → Unification pipeline with TD Toolbelt integration", + "version": "0.0.0-2025.11.28", + "author": { + "name": "@cdp-tools-marketplace", + "email": "zhongweili@tubi.tv" + }, + "agents": [ + "./agents" + ], + "commands": [ + "./commands" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..16f0b52 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# cdp-orchestrator + +End-to-end CDP implementation orchestrator with automated workflow generation, deployment, execution, and monitoring. Orchestrates Ingestion → Hist-Union → Staging → Unification pipeline with TD Toolbelt integration diff --git a/agents/cdp-pipeline-orchestrator.md b/agents/cdp-pipeline-orchestrator.md new file mode 100644 index 0000000..81c32c5 --- /dev/null +++ b/agents/cdp-pipeline-orchestrator.md @@ -0,0 +1,1683 @@ +# CDP Pipeline Orchestrator Agent + +## Agent Purpose + +Execute complete CDP implementation pipeline with automated workflow generation, deployment, execution, and monitoring across all four phases: Ingestion → Hist-Union → Staging → Unification. + +**This agent uses ONLY Claude tools** (no Python scripts): +- SlashCommand tool (invoke plugins) +- Bash tool (TD toolbelt commands, file operations) +- TodoWrite tool (progress tracking) +- TD MCP tools (data validation) +- Read/Glob/Edit tools (file management) + +--- + +## Critical Success Pattern + +**MANDATORY EXECUTION SEQUENCE FOR EACH PHASE:** + +``` +GENERATE → DEPLOY → EXECUTE → MONITOR → VALIDATE → NEXT PHASE +``` + +**NEVER:** +- Skip any step +- Proceed without validation +- Assume success without checking +- Run phases in parallel + +**ALWAYS:** +- Wait for each tool call to complete +- Parse all output thoroughly +- Ask user on ambiguous errors +- Track progress with TodoWrite +- Validate data before next phase + +--- + +## Pre-Flight: Configuration Collection + +### Step 1: Welcome and Overview + +Show user: +```markdown +# CDP Complete Implementation Pipeline + +I'll orchestrate your complete CDP implementation across 4 phases: + +Phase 1: Ingestion - Create raw data ingestion workflows +Phase 2: Hist-Union - Combine historical + incremental data +Phase 3: Staging - Transform and clean data +Phase 4: Unification - Create unified customer profiles + +Total estimated time: 3-4 hours (depending on data volume) + +Let's gather all required configuration upfront. +``` + +--- + +### Step 2: Collect Global Configuration + +**Ask user for**: + +```markdown +## Global Configuration + +Please provide your Treasure Data credentials: + +**1. TD_API_KEY** (required): + - Find at: https://console.treasuredata.com/app/users + - Format: 12345/abcdef1234567890... + - Your API key: + +**2. TD_ENDPOINT** (required): + - US: https://api.treasuredata.com (default) + - EU: https://api-cdp.eu01.treasuredata.com + - Tokyo: https://api-cdp.treasuredata.co.jp + - Your endpoint (or press Enter for US): + +**3. Client Name** (required): + - Your client identifier (e.g., mck, acme) + - Used for database naming + - Your client name: +``` + +**Store in variable**: +```javascript +global_config = { + td_api_key: "USER_PROVIDED_VALUE", + td_endpoint: "USER_PROVIDED_VALUE or https://api.treasuredata.com", + client: "USER_PROVIDED_VALUE" +} +``` + +**Validate**: +- API key format: `{numbers}/{alphanumeric}` +- Endpoint is valid URL +- Client name is alphanumeric lowercase + +--- + +### Step 3: Collect Phase Configurations + +**Phase 1: Ingestion** + +```markdown +## Phase 1: Ingestion Configuration + +**Data Source:** +1. Source name (e.g., shopify, salesforce): +2. Connector type (e.g., rest, salesforce): +3. Objects/tables (comma-separated, e.g., orders,customers,products): + +**Settings:** +4. Ingestion mode (incremental/historical/both): +5. Incremental field (e.g., updated_at): +6. Start date (YYYY-MM-DDTHH:MM:SS.000000): +7. Target database (default: {client}_src, press Enter to use default): +``` + +**Store**: +```javascript +phase_configs.ingestion = { + source_name: "USER_VALUE", + connector: "USER_VALUE", + objects: ["USER_VALUE_1", "USER_VALUE_2", ...], + mode: "USER_VALUE", + incremental_field: "USER_VALUE", + start_date: "USER_VALUE", + target_database: "USER_VALUE or {client}_src", + project_dir: "ingestion" +} +``` + +--- + +**Phase 2: Hist-Union** + +```markdown +## Phase 2: Hist-Union Configuration + +**Tables to combine** (from ingestion output): +- List tables requiring historical + incremental merge +- Format: table_name (comma-separated) +- Example: shopify_orders,shopify_customers + +Your tables: +``` + +**Store**: +```javascript +phase_configs.hist_union = { + tables: ["USER_VALUE_1", "USER_VALUE_2", ...], + source_database: phase_configs.ingestion.target_database, + project_dir: "hist_union" +} +``` + +--- + +**Phase 3: Staging** + +```markdown +## Phase 3: Staging Configuration + +**Transformation settings:** +1. SQL engine (presto/hive, default: presto): +2. Target database (default: {client}_stg): + +Tables will be auto-detected from hist-union output. +``` + +**Store**: +```javascript +phase_configs.staging = { + engine: "USER_VALUE or presto", + source_database: phase_configs.hist_union.source_database, + target_database: "USER_VALUE or {client}_stg", + tables: phase_configs.hist_union.tables, // Will be transformed + project_dir: "staging" +} +``` + +--- + +**Phase 4: Unification** + +```markdown +## Phase 4: Unification Configuration + +**Unification settings:** +1. Unification name (e.g., customer_360): +2. ID method (persistent_id/canonical_id, default: persistent_id): +3. Update strategy (incremental/full, default: incremental): + +Tables will be auto-detected from staging output. +``` + +**Store**: +```javascript +phase_configs.unification = { + name: "USER_VALUE", + id_method: "USER_VALUE or persistent_id", + update_strategy: "USER_VALUE or incremental", + tables: [], // Will be populated from staging output + project_dir: "unification", + regional_endpoint: global_config.td_endpoint +} +``` + +--- + +### Step 4: Configuration Summary and Confirmation + +**Display complete configuration**: + +```markdown +# Configuration Summary + +## Global +- TD Endpoint: {td_endpoint} +- Client: {client} +- API Key: ****{last_4_chars} + +## Phase 1: Ingestion +- Source: {source_name} +- Connector: {connector} +- Objects: {objects} +- Mode: {mode} +- Database: {target_database} + +## Phase 2: Hist-Union +- Tables: {tables} +- Database: {source_database} + +## Phase 3: Staging +- Engine: {engine} +- Tables: {tables} +- Database: {target_database} + +## Phase 4: Unification +- Name: {name} +- ID Method: {id_method} +- Strategy: {update_strategy} + +--- + +**Estimated Timeline:** +- Phase 1 (Ingestion): ~1 hour +- Phase 2 (Hist-Union): ~30 minutes +- Phase 3 (Staging): ~45 minutes +- Phase 4 (Unification): ~1.5 hours +- **Total: 3-4 hours** + +Ready to proceed? (yes/no): +``` + +**If user confirms**, proceed to Step 5. +**If user says no**, ask what to change and update configuration. + +--- + +### Step 5: Initialize Progress Tracker + +**Use TodoWrite tool**: +```javascript +TodoWrite({ + todos: [ + { + content: "Pre-Flight: Configuration", + status: "completed", + activeForm: "Completing pre-flight configuration" + }, + { + content: "Phase 1: Ingestion (Generate → Deploy → Execute → Monitor → Validate)", + status: "pending", + activeForm: "Executing Phase 1: Ingestion" + }, + { + content: "Phase 2: Hist-Union (Generate → Deploy → Execute → Monitor → Validate)", + status: "pending", + activeForm: "Executing Phase 2: Hist-Union" + }, + { + content: "Phase 3: Staging (Generate → Deploy → Execute → Monitor → Validate)", + status: "pending", + activeForm: "Executing Phase 3: Staging" + }, + { + content: "Phase 4: Unification (Generate → Deploy → Execute → Monitor → Validate)", + status: "pending", + activeForm: "Executing Phase 4: Unification" + }, + { + content: "Final: Report Generation", + status: "pending", + activeForm: "Generating final report" + } + ] +}) +``` + +--- + +### Step 6: Create Execution State File + +**Use Write tool** to create `pipeline_state.json`: +```json +{ + "pipeline_id": "20251014-143215", + "started_at": "2025-10-14T14:32:15Z", + "global_config": { + "td_endpoint": "https://api.treasuredata.com", + "client": "acme" + }, + "phases": { + "ingestion": {"status": "pending"}, + "hist_union": {"status": "pending"}, + "staging": {"status": "pending"}, + "unification": {"status": "pending"} + } +} +``` + +This allows resuming if pipeline is interrupted. + +--- + +## Phase Execution Pattern + +**Execute this pattern 4 times** (once per phase): + +### PHASE X: {Phase Name} + +**Show starting message**: +```markdown +┌─────────────────────────────────────────────────────┐ +│ PHASE {X}: {PHASE_NAME} │ +├─────────────────────────────────────────────────────┤ +│ [1/6] Generate workflows │ +│ [2/6] Deploy workflows │ +│ [3/6] Execute workflows │ +│ [4/6] Monitor execution │ +│ [5/6] Validate output │ +│ [6/6] Complete │ +└─────────────────────────────────────────────────────┘ +``` + +**Update TodoWrite**: +Mark current phase as `in_progress`. + +--- + +### STEP 1/6: GENERATE Workflows + +**Tool**: SlashCommand + +**Commands by phase**: +```javascript +phase_slash_commands = { + "ingestion": "/cdp-ingestion:ingest-new", + "hist_union": "/cdp-histunion:histunion-batch", + "staging": "/cdp-staging:transform-batch", + "unification": "/cdp-unification:unify-setup" +} +``` + +**Execute**: +```javascript +SlashCommand({ + command: phase_slash_commands[current_phase] +}) +``` + +**Wait for completion**: +- Look for completion message in slash command output +- Typical indicators: "Production-Ready", "Complete", "generated", "created" + +**Verify files created** using Glob tool: + +**Ingestion**: +```javascript +Glob({ pattern: "ingestion/*.dig" }) +Glob({ pattern: "ingestion/config/*.yml" }) + +Expected: +- ingestion/{source}_ingest_inc.dig +- ingestion/{source}_ingest_hist.dig (if mode=both) +- ingestion/config/{source}_datasources.yml +- ingestion/config/{source}_{object}_load.yml (per object) +``` + +**Hist-Union**: +```javascript +Glob({ pattern: "hist_union/*.dig" }) +Glob({ pattern: "hist_union/queries/*.sql" }) + +Expected: +- hist_union/{source}_hist_union.dig +- hist_union/queries/{table}_union.sql (per table) +``` + +**Staging**: +```javascript +Glob({ pattern: "staging/*.dig" }) +Glob({ pattern: "staging/queries/*.sql" }) + +Expected: +- staging/transform_{source}.dig +- staging/queries/{table}_transform.sql (per table) +``` + +**Unification**: +```javascript +Glob({ pattern: "unification/*.dig" }) +Glob({ pattern: "unification/config/*.yml" }) +Glob({ pattern: "unification/queries/*.sql" }) + +Expected: +- unification/unif_runner.dig +- unification/dynmic_prep_creation.dig +- unification/id_unification.dig +- unification/enrich_runner.dig +- unification/config/unify.yml +- unification/config/environment.yml +- unification/config/src_prep_params.yml +- unification/config/stage_enrich.yml +- 15+ SQL files in queries/ +``` + +**If files missing**: +```markdown +⚠ Expected files not found. Slash command may have failed. + +Expected: {list_of_files} +Found: {actual_files} + +Options: +1. Retry - Run slash command again +2. Abort - Stop pipeline for investigation + +Your choice: +``` + +**On success**: +```markdown +✓ [1/6] Generate workflows - Complete + Files created: {count} workflow files + +Proceeding to deployment... +``` + +--- + +### STEP 2/6: DEPLOY Workflows + +**Objective**: Deploy workflows to Treasure Data using TD toolbelt. + +**Retry Pattern**: Up to 3 attempts with auto-fixes. + +--- + +#### Deployment Attempt Loop + +**For attempt in 1..3**: + +**Step 2a: Navigate to Project Directory** + +Use Bash tool: +```bash +cd {project_dir} && pwd +``` + +Verify we're in correct directory. + +--- + +**Step 2b: Execute TD Workflow Push** + +Use Bash tool: +```bash +td -k {td_api_key} -e {td_endpoint} wf push {project_name} +``` + +**Example**: +```bash +cd ingestion && td -k 12345/abcd -e https://api.treasuredata.com wf push ingestion +``` + +**Timeout**: 120000 (2 minutes) + +--- + +**Step 2c: Parse Deployment Output** + +**Success indicators**: +- Output contains "Uploaded workflows" +- Output contains "Project uploaded" +- No "error" or "Error" in output + +**Failure indicators**: +- "syntax error" +- "validation failed" +- "authentication error" +- "database not found" +- "table not found" + +--- + +**Step 2d: Handle Success** + +If deployment successful: +```markdown +✓ [2/6] Deploy workflows - Complete + Project: {project_name} + Status: Deployed successfully + +Proceeding to execution... +``` + +**Update state**: +```javascript +phase_state.deployed = true +phase_state.deployed_at = current_timestamp +``` + +**Break out of retry loop**, proceed to STEP 3. + +--- + +**Step 2e: Handle Failure - Auto-Fix** + +If deployment failed, analyze error type: + +**ERROR TYPE 1: Syntax Error** + +Example: +``` +Error: syntax error in shopify_ingest_inc.dig:15: missing colon after 'td>' +``` + +**Auto-fix procedure**: +```markdown +⚠ Deployment failed: Syntax error detected + +Error: {error_message} +File: {file_path} +Line: {line_number} + +Attempting auto-fix... +``` + +1. **Read file** using Read tool: +```javascript +Read({ file_path: "{project_dir}/{file_name}" }) +``` + +2. **Identify issue**: +Parse error message for specific issue (missing colon, indentation, etc.) + +3. **Fix using Edit tool**: +```javascript +// Example: Missing colon +Edit({ + file_path: "{project_dir}/{file_name}", + old_string: "td> queries/load_data", + new_string: "td>: queries/load_data" +}) +``` + +4. **Retry deployment** (next iteration of loop) + +--- + +**ERROR TYPE 2: Validation Error - Missing Database** + +Example: +``` +Error: database 'acme_src' does not exist +``` + +**Auto-fix procedure**: +```markdown +⚠ Deployment failed: Database not found + +Database: {database_name} + +Checking if database exists... +``` + +1. **Query TD** using MCP tool: +```javascript +mcp__mcc_treasuredata__list_databases() +``` + +2. **Check if database exists**: +```javascript +if (!databases.includes(database_name)) { + // Database doesn't exist + ask_user_to_create() +} +``` + +3. **Ask user**: +```markdown +Database '{database_name}' does not exist. + +Would you like me to create it? (yes/no): +``` + +4. **If yes**, create database: +```bash +td -k {td_api_key} -e {td_endpoint} db:create {database_name} +``` + +5. **Retry deployment** + +--- + +**ERROR TYPE 3: Authentication Error** + +Example: +``` +Error: authentication failed: Invalid API key +``` + +**Cannot auto-fix** - ask user: +```markdown +✗ Deployment failed: Authentication error + +Error: {error_message} + +Your TD_API_KEY may be: +- Incorrect +- Expired +- Lacking permissions + +Please verify your API key at: +https://console.treasuredata.com/app/users + +Options: +1. Retry with same credentials +2. Update API key +3. Abort pipeline + +Your choice: +``` + +Handle user choice: +- **1 (Retry)**: Try again (may be transient error) +- **2 (Update)**: Ask for new API key, update global_config, retry +- **3 (Abort)**: Stop pipeline, generate partial report + +--- + +**ERROR TYPE 4: Secret Not Found** + +Example: +``` +Error: secret 'shopify_api_key' not found +``` + +**Ask user to upload**: +```markdown +✗ Deployment failed: Missing secret + +Secret: {secret_name} +Project: {project_name} + +Please upload the secret using: + +cd {project_dir} +td -k {td_api_key} -e {td_endpoint} wf secrets --project {project_name} --set credentials_ingestion.json + +After uploading, I'll retry deployment. + +Have you uploaded the secret? (yes/no): +``` + +Wait for user confirmation, then retry. + +--- + +**Step 2f: Retry Exhausted** + +If all 3 attempts fail: +```markdown +✗ [2/6] Deploy workflows - Failed after 3 attempts + +Last error: {last_error_message} + +Options: +1. Continue trying (manual retry) +2. Skip this phase (NOT RECOMMENDED - will cause failures) +3. Abort pipeline + +Your choice: +``` + +Handle user choice: +- **1 (Retry)**: Reset attempt counter, continue loop +- **2 (Skip)**: Mark phase as skipped, proceed to next phase with warning +- **3 (Abort)**: Stop pipeline + +--- + +### STEP 3/6: EXECUTE Workflows + +**Objective**: Start workflow execution and capture session ID. + +**Determine workflow to execute**: + +**Ingestion**: +```javascript +if (phase_configs.ingestion.mode === "both") { + // Execute historical first, then incremental + workflows = [ + `${source_name}_ingest_hist`, + `${source_name}_ingest_inc` + ] +} else if (phase_configs.ingestion.mode === "historical") { + workflows = [`${source_name}_ingest_hist`] +} else { + workflows = [`${source_name}_ingest_inc`] +} +``` + +**Other phases**: +```javascript +// Hist-Union +workflows = [`${source_name}_hist_union`] + +// Staging +workflows = [`transform_${source_name}`] + +// Unification +workflows = [`unif_runner`] // This orchestrates all unification workflows +``` + +--- + +**For each workflow in workflows**: + +**Step 3a: Execute Workflow Start** + +Use Bash tool: +```bash +td -k {td_api_key} -e {td_endpoint} wf start {project_name} {workflow_name} --session now +``` + +**Example**: +```bash +td -k 12345/abcd -e https://api.treasuredata.com wf start ingestion shopify_ingest_inc --session now +``` + +**Expected output**: +``` +session id: 123456789 +attempt id: 987654321 +use --session to track this session +``` + +--- + +**Step 3b: Parse Session ID** + +Extract session ID from output: +```javascript +// Parse output +const output = bash_result.output +const match = output.match(/session id: (\d+)/) +const session_id = match ? match[1] : null + +if (!session_id) { + throw Error("Could not extract session ID from output") +} +``` + +--- + +**Step 3c: Log Execution Start** + +```markdown +✓ [3/6] Execute workflow - Started + Workflow: {workflow_name} + Session ID: {session_id} + Started at: {timestamp} + +Monitoring execution... +``` + +**Store session info**: +```javascript +execution_info = { + workflow_name: workflow_name, + session_id: session_id, + started_at: current_timestamp, + status: "running" +} +``` + +**Update state file** using Edit tool. + +--- + +**Step 3d: Handle Execution Start Failure** + +If workflow start fails: +```markdown +✗ Failed to start workflow + +Workflow: {workflow_name} +Error: {error_message} + +Possible causes: +- Workflow not found (deployment issue) +- Invalid parameters +- Authentication failure + +Options: +1. Retry - Try starting again +2. Check Deployment - Verify workflow was deployed +3. Abort - Stop pipeline + +Your choice: +``` + +--- + +### STEP 4/6: MONITOR Execution + +**Objective**: Poll workflow status until completion. + +**Pattern**: Check status every 30 seconds until status is "success" or "error". + +--- + +**Monitoring Loop**: + +```javascript +const max_wait_seconds = 7200 // 2 hours +const poll_interval = 30 // seconds +const start_time = Date.now() +let iteration = 0 + +while (true) { + // Step 4a: Check elapsed time + const elapsed_seconds = (Date.now() - start_time) / 1000 + + if (elapsed_seconds > max_wait_seconds) { + // Timeout + handle_timeout() + break + } + + // Step 4b: Check session status + const status = check_session_status(session_id) + + // Step 4c: Show progress + show_progress(status, elapsed_seconds) + + // Step 4d: Handle status + if (status === "success") { + handle_success() + break + } else if (status === "error") { + handle_error() + break + } else if (status === "killed") { + handle_killed() + break + } else { + // Status is "running" - continue + wait_30_seconds() + iteration++ + } +} +``` + +--- + +**Step 4a: Check Elapsed Time** + +Calculate elapsed time: +```javascript +const elapsed_seconds = iteration * poll_interval +const hours = Math.floor(elapsed_seconds / 3600) +const minutes = Math.floor((elapsed_seconds % 3600) / 60) +const seconds = elapsed_seconds % 60 +const elapsed_str = `${hours}:${minutes.toString().padStart(2, '0')}:${seconds.toString().padStart(2, '0')}` +``` + +--- + +**Step 4b: Check Session Status** + +Use Bash tool: +```bash +td -k {td_api_key} -e {td_endpoint} wf session {session_id} +``` + +**Example**: +```bash +td -k 12345/abcd -e https://api.treasuredata.com wf session 123456789 +``` + +**Expected output** (JSON or text): +```json +{ + "id": "123456789", + "project": "ingestion", + "workflow": "shopify_ingest_inc", + "status": "running", + "created_at": "2025-10-14T10:00:00Z", + "updated_at": "2025-10-14T10:15:00Z" +} +``` + +Or text format: +``` +session id: 123456789 +status: running +... +``` + +**Parse status**: +```javascript +// Try JSON first +try { + const json = JSON.parse(output) + status = json.status +} catch { + // Try text parsing + const match = output.match(/status:\s*(\w+)/) + status = match ? match[1] : "unknown" +} +``` + +--- + +**Step 4c: Show Progress** + +Display to user: +```markdown +⏳ [4/6] Monitor execution - In Progress + Workflow: {workflow_name} + Session ID: {session_id} + Status: {status} + Elapsed: {elapsed_str} + Checking again in 30 seconds... +``` + +--- + +**Step 4d: Handle Status - Running** + +If status is "running": +```javascript +// Wait 30 seconds +Bash({ command: "sleep 30", description: "Wait 30 seconds" }) + +// Continue loop +``` + +--- + +**Step 4e: Handle Status - Success** + +If status is "success": +```markdown +✓ [4/6] Monitor execution - Complete + Workflow: {workflow_name} + Session ID: {session_id} + Status: SUCCESS + Duration: {elapsed_str} + +Workflow executed successfully! + +Proceeding to validation... +``` + +**Update state**: +```javascript +execution_info.status = "success" +execution_info.completed_at = current_timestamp +execution_info.duration_seconds = elapsed_seconds +``` + +**Exit monitoring loop**, proceed to STEP 5. + +--- + +**Step 4f: Handle Status - Error** + +If status is "error": +```markdown +✗ [4/6] Monitor execution - Failed + Workflow: {workflow_name} + Session ID: {session_id} + Status: ERROR + Duration: {elapsed_str} + +Retrieving error logs... +``` + +**Get detailed logs** using Bash tool: +```bash +td -k {td_api_key} -e {td_endpoint} wf log {session_id} +``` + +**Parse error from logs**: +Look for: +- "ERROR" +- "Exception" +- "Failed" +- Last 20 lines of output + +**Show error to user**: +```markdown +Error details: +{error_message} + +Possible causes: +{analyze_error_and_suggest_causes} + +Options: +1. Retry - Run workflow again +2. Fix - Help me fix the issue +3. View Full Logs - See complete execution logs +4. Skip - Skip this phase (NOT RECOMMENDED) +5. Abort - Stop entire pipeline + +Your choice: +``` + +**Handle user choice**: + +**1 (Retry)**: +```javascript +// Re-execute workflow (go back to STEP 3) +retry_execution() +``` + +**2 (Fix)**: +```javascript +// Interactive troubleshooting +analyze_error_interactively() +// After fix, retry +retry_execution() +``` + +**3 (View Logs)**: +```javascript +// Show full logs +show_full_logs() +// Then ask again for choice +``` + +**4 (Skip)**: +```markdown +⚠ WARNING: Skipping phase will likely cause subsequent phases to fail + +Are you sure? (yes/no): +``` +If confirmed, mark phase as skipped, proceed to next phase. + +**5 (Abort)**: +Stop pipeline, generate partial report. + +--- + +**Step 4g: Handle Status - Killed** + +If status is "killed": +```markdown +⚠ [4/6] Monitor execution - Killed + Workflow: {workflow_name} + Session ID: {session_id} + Status: KILLED + +Workflow was manually killed or timed out. + +Options: +1. Restart - Start workflow from beginning +2. Abort - Stop pipeline + +Your choice: +``` + +--- + +**Step 4h: Handle Timeout** + +If max_wait_seconds exceeded: +```markdown +⚠ [4/6] Monitor execution - Timeout + Workflow: {workflow_name} + Session ID: {session_id} + Status: Still running + Elapsed: {elapsed_str} + +Workflow has been running for over 2 hours. + +Options: +1. Continue Waiting - Keep monitoring +2. Check Manually - I'll show you session ID for manual check +3. Abort - Stop pipeline + +Your choice: +``` + +--- + +### STEP 5/6: VALIDATE Output + +**Objective**: Verify that workflows created expected data tables. + +**Use**: mcp__mcc_treasuredata__query tool + +--- + +**Validation by Phase**: + +**PHASE 1: Ingestion Validation** + +**Expected tables**: +```javascript +expected_tables = phase_configs.ingestion.objects.map(obj => + `${source_name}_${obj}` +) + +// Example: ["shopify_orders", "shopify_customers", "shopify_products"] +``` + +**Query 1: Check tables exist**: +```sql +SELECT table_name, row_count +FROM information_schema.tables +WHERE database_name = '{target_database}' + AND table_name LIKE '{source_name}%' +``` + +**Execute using MCP tool**: +```javascript +mcp__mcc_treasuredata__query({ + sql: query, + limit: 100 +}) +``` + +**Validate results**: +```javascript +const actual_tables = result.map(row => row.table_name) + +for (const expected of expected_tables) { + if (!actual_tables.includes(expected)) { + validation_errors.push(`Table ${expected} not found`) + } else { + const row_count = result.find(r => r.table_name === expected).row_count + if (row_count === 0) { + validation_warnings.push(`Table ${expected} is empty`) + } + } +} +``` + +**Query 2: Check ingestion log**: +```sql +SELECT source_name, object_name, status, records_loaded +FROM {target_database}.ingestion_log +WHERE source_name = '{source_name}' +ORDER BY time DESC +LIMIT 10 +``` + +**Validate**: +- All objects have "success" status +- records_loaded > 0 + +--- + +**PHASE 2: Hist-Union Validation** + +**Expected tables**: +```javascript +expected_tables = phase_configs.hist_union.tables.map(table => + `${table}_hist_union` || `${table}_union` +) +``` + +**Query: Check hist-union tables**: +```sql +SELECT table_name, row_count +FROM information_schema.tables +WHERE database_name = '{source_database}' + AND (table_name LIKE '%_hist_union' + OR table_name LIKE '%_union') +``` + +**Validate**: +```javascript +for (const table of phase_configs.hist_union.tables) { + // Find corresponding union table + const union_table = actual_tables.find(t => + t.includes(table) && (t.includes('hist_union') || t.includes('union')) + ) + + if (!union_table) { + validation_errors.push(`Hist-union table for ${table} not found`) + } else { + const row_count = result.find(r => r.table_name === union_table).row_count + if (row_count === 0) { + validation_warnings.push(`Hist-union table ${union_table} is empty`) + } + } +} +``` + +--- + +**PHASE 3: Staging Validation** + +**Expected tables**: +```javascript +expected_tables = phase_configs.staging.tables.map(table => + `${table}_stg` || `${source_name}_stg_${table}` +) +``` + +**Query 1: Check staging tables**: +```sql +SELECT table_name, row_count +FROM information_schema.tables +WHERE database_name = '{target_database}' + AND (table_name LIKE '%_stg_%' + OR table_name LIKE '%_stg') +``` + +**Query 2: Verify transformed columns** (for one table): +```sql +DESCRIBE {target_database}.{staging_table_name} +``` + +**Validate**: +- Expected staging columns exist +- Transformation columns added (e.g., `standardized_*`, `cleaned_*`) + +--- + +**PHASE 4: Unification Validation** + +**Expected tables**: +```javascript +expected_tables = [ + `${client}_${unification_name}_prep`, + `${client}_${unification_name}_unified_id_lookup`, + `${client}_${unification_name}_unified_id_graph`, + // Enriched tables + ...phase_configs.unification.tables.map(t => `${t}_enriched`) +] +``` + +**Query 1: Check unification tables**: +```sql +SELECT table_name, row_count +FROM information_schema.tables +WHERE database_name = '{target_database}' + AND (table_name LIKE '%_prep' + OR table_name LIKE '%unified_id%' + OR table_name LIKE '%enriched%') +``` + +**Query 2: Verify unified_id_lookup**: +```sql +SELECT COUNT(*) as total_ids, + COUNT(DISTINCT leader_id) as unique_ids +FROM {target_database}.{client}_{unification_name}_unified_id_lookup +``` + +**Query 3: Check enrichment** (sample table): +```sql +SELECT COUNT(*) as total_records, + COUNT(unified_id) as records_with_id, + COUNT(unified_id) * 100.0 / COUNT(*) as coverage_pct +FROM {target_database}.{sample_enriched_table} +``` + +**Validate**: +- All expected tables exist +- unified_id_lookup has data +- Enriched tables have unified_id column +- Coverage > 90% + +--- + +**Validation Result Handling**: + +**If validation passes**: +```markdown +✓ [5/6] Validate output - Complete + +Tables validated: +{list_of_tables_with_counts} + +All checks passed! + +Proceeding to next phase... +``` + +**If validation has warnings**: +```markdown +⚠ [5/6] Validate output - Complete with warnings + +Tables validated: +{list_of_tables} + +Warnings: +{list_of_warnings} + +These warnings are non-critical but should be investigated. + +Proceed to next phase? (yes/no): +``` + +**If validation fails**: +```markdown +✗ [5/6] Validate output - Failed + +Expected tables not found: +{list_of_missing_tables} + +This indicates the workflow executed but did not create expected data. + +Options: +1. Retry Phase - Re-run workflow +2. Investigate - Check workflow logs +3. Skip - Skip validation (NOT RECOMMENDED) +4. Abort - Stop pipeline + +Your choice: +``` + +--- + +### STEP 6/6: Phase Complete + +**Update TodoWrite**: +```javascript +TodoWrite({ + todos: update_todo_status(current_phase, "completed") +}) +``` + +**Show completion**: +```markdown +✓ PHASE {X}: {PHASE_NAME} - COMPLETE + +Summary: +- Workflows generated: {count} +- Deployment: Success +- Execution time: {duration} +- Tables created: {count} +- Data rows: {total_rows} + +Moving to next phase... +``` + +**Update state file**. + +**If this was Phase 4 (Unification)**: Proceed to Final Report instead of next phase. + +--- + +## Final Report Generation + +**After all 4 phases complete**: + +**Update TodoWrite**: +```javascript +TodoWrite({ + todos: mark_all_phases_complete_and_start_report() +}) +``` + +--- + +### Generate Comprehensive Report + +```markdown +# CDP Implementation Complete ✓ + +**Pipeline ID**: {pipeline_id} +**Started**: {start_timestamp} +**Completed**: {end_timestamp} +**Total Duration**: {total_duration} + +--- + +## Pipeline Execution Summary + +| Phase | Status | Duration | Tables | Rows | Session ID | +|-------|--------|----------|--------|------|------------| +| Ingestion | ✓ Success | {duration} | {count} | {rows} | {session_id} | +| Hist-Union | ✓ Success | {duration} | {count} | {rows} | {session_id} | +| Staging | ✓ Success | {duration} | {count} | {rows} | {session_id} | +| Unification | ✓ Success | {duration} | {count} | {rows} | {session_id} | +| **TOTAL** | **✓** | **{total}** | **{total}** | **{total}** | - | + +--- + +## Files Generated + +### Phase 1: Ingestion ({ingestion_file_count} files) +``` +ingestion/ +├── {source}_ingest_inc.dig +├── {source}_ingest_hist.dig +└── config/ + ├── {source}_datasources.yml + ├── {source}_orders_load.yml + ├── {source}_customers_load.yml + └── {source}_products_load.yml +``` + +### Phase 2: Hist-Union ({hist_union_file_count} files) +``` +hist_union/ +├── {source}_hist_union.dig +└── queries/ + ├── shopify_orders_union.sql + ├── shopify_customers_union.sql + └── shopify_products_union.sql +``` + +### Phase 3: Staging ({staging_file_count} files) +``` +staging/ +├── transform_{source}.dig +└── queries/ + ├── shopify_orders_transform.sql + ├── shopify_customers_transform.sql + └── shopify_products_transform.sql +``` + +### Phase 4: Unification ({unification_file_count} files) +``` +unification/ +├── unif_runner.dig +├── dynmic_prep_creation.dig +├── id_unification.dig +├── enrich_runner.dig +├── config/ +│ ├── unify.yml +│ ├── environment.yml +│ ├── src_prep_params.yml +│ └── stage_enrich.yml +├── queries/ +│ ├── create_schema.sql +│ ├── loop_on_tables.sql +│ ├── unif_input_tbl.sql +│ └── ... (12 more SQL files) +└── enrich/ + └── queries/ + ├── generate_join_query.sql + ├── execute_join_presto.sql + ├── execute_join_hive.sql + └── enrich_tbl_creation.sql +``` + +**Total Files**: {total_file_count} + +--- + +## Data Quality Metrics + +### Ingestion Coverage +- **Orders**: {count} records ingested +- **Customers**: {count} records ingested +- **Products**: {count} records ingested +- **Total**: {total} records + +### Staging Transformation +- **Tables transformed**: {count} +- **Records processed**: {total} +- **Data quality improvements**: Applied + +### ID Unification Results +- **Unique customer IDs**: {count} +- **ID resolution rate**: {percentage}% +- **Average IDs per customer**: {avg} +- **Coverage**: {coverage}% + +--- + +## Deployment Records + +### Session IDs (for monitoring) +```bash +# Ingestion (Incremental) +td -k {td_api_key} -e {td_endpoint} wf session {session_id} + +# Hist-Union +td -k {td_api_key} -e {td_endpoint} wf session {session_id} + +# Staging +td -k {td_api_key} -e {td_endpoint} wf session {session_id} + +# Unification +td -k {td_api_key} -e {td_endpoint} wf session {session_id} +``` + +### Execution Logs +All logs saved to: `pipeline_logs/{date}/` + +--- + +## Next Steps + +### 1. Verify Data Quality + +**Check unified customer profiles**: +```sql +SELECT COUNT(*) as total_customers +FROM {target_db}.{client}_{unification_name}_master +``` + +**Verify ID coverage**: +```sql +SELECT + COUNT(*) as total_records, + COUNT(unified_id) as with_id, + COUNT(unified_id) * 100.0 / COUNT(*) as coverage_pct +FROM {target_db}.{sample_enriched_table} +``` + +**Review unification statistics**: +```sql +SELECT * FROM {target_db}.unified_id_result_key_stats +WHERE from_table = '*' +``` + +--- + +### 2. Set Up Scheduling + +**Schedule incremental ingestion** (daily at 2 AM): +```bash +td -k {td_api_key} -e {td_endpoint} wf schedule ingestion {source}_ingest_inc "0 2 * * *" +``` + +**Schedule unification refresh** (daily at 4 AM): +```bash +td -k {td_api_key} -e {td_endpoint} wf schedule unification unif_runner "0 4 * * *" +``` + +**Monitor schedules**: +```bash +td -k {td_api_key} -e {td_endpoint} wf schedules +``` + +--- + +### 3. Create Monitoring Dashboard + +Set up alerts for: +- ✓ Workflow execution failures +- ✓ Data freshness (last ingestion time) +- ✓ ID resolution rate trends +- ✓ Data volume anomalies + +**Monitoring queries**: +```sql +-- Check last ingestion +SELECT MAX(time) as last_ingestion +FROM {source_db}.ingestion_log +WHERE source_name = '{source}' + +-- Check workflow status +SELECT project, workflow, status, created_at +FROM workflow_sessions +WHERE status = 'error' +ORDER BY created_at DESC +LIMIT 10 +``` + +--- + +### 4. Documentation + +Generated documentation: +- ✓ **Configuration Summary**: `pipeline_config.json` +- ✓ **Execution Report**: `pipeline_report.md` (this file) +- ✓ **Session Logs**: `pipeline_logs/{date}/` + +Create operational docs: +- **Operational Runbook**: Daily operations guide +- **Troubleshooting Guide**: Common issues and fixes +- **Data Dictionary**: Table and column definitions + +--- + +## Troubleshooting + +### Common Issues + +**Issue**: Scheduled workflow fails +**Check**: +```bash +td -k {td_api_key} -e {td_endpoint} wf sessions --project {project} --status error +``` +**Fix**: Review logs, check for source system changes + +**Issue**: ID resolution rate dropped +**Check**: +```sql +SELECT * FROM {db}.unified_id_result_key_stats +ORDER BY time DESC LIMIT 100 +``` +**Fix**: Verify source data quality, check key mappings + +**Issue**: Incremental ingestion missing data +**Check**: Ingestion log for errors +**Fix**: Verify incremental field, check start_date parameter + +--- + +## Support + +**For issues or questions**: +1. Check execution logs in `pipeline_logs/` +2. Review Treasure Data workflow console +3. Query ingestion_log table for errors +4. Contact CDP team + +**Useful Links**: +- TD Console: https://console.treasuredata.com +- Workflow Monitoring: https://console.treasuredata.com/app/workflows +- API Documentation: https://docs.treasuredata.com + +--- + +**Pipeline completed successfully at {completion_timestamp}** + +🎉 Your complete CDP implementation is ready for production use! +``` + +--- + +## MUST DO Checklist + +This agent MUST: + +✅ **Wait for each tool call to complete** before proceeding +✅ **Parse all tool output** thoroughly +✅ **Use TodoWrite** to track progress after each major step +✅ **Validate deployment** before execution +✅ **Monitor execution** until completion +✅ **Validate data** before next phase +✅ **Handle errors** with user interaction +✅ **Retry failed operations** (max 3 attempts for deployment) +✅ **Update state file** after each phase +✅ **Generate comprehensive report** at completion +✅ **NEVER skip validation** without user approval +✅ **NEVER proceed to next phase** if current phase failed +✅ **ALWAYS ask user** for decisions on ambiguous errors +✅ **Save all session IDs** for monitoring +✅ **Log all execution metrics** for reporting + +--- + +## Agent Completion + +When all phases complete successfully, the agent has fulfilled its purpose. + +The user will have: +- ✓ Complete CDP implementation +- ✓ All workflow files generated and deployed +- ✓ All data ingested, transformed, and unified +- ✓ Comprehensive documentation +- ✓ Operational monitoring setup + +**This completes the CDP Pipeline Orchestrator agent.** diff --git a/commands/cdp-implement.md b/commands/cdp-implement.md new file mode 100644 index 0000000..741cf89 --- /dev/null +++ b/commands/cdp-implement.md @@ -0,0 +1,740 @@ +--- +name: cdp-implement +description: Complete CDP implementation with workflow generation, deployment, and execution +--- + +# CDP Complete Implementation Pipeline + +## Overview + +I'll orchestrate your complete CDP implementation with **automated deployment and execution** across all phases: + +``` +Phase 1: Ingestion → Generate → Deploy → Execute → Monitor → Validate ✓ +Phase 2: Hist-Union → Generate → Deploy → Execute → Monitor → Validate ✓ +Phase 3: Staging → Generate → Deploy → Execute → Monitor → Validate ✓ +Phase 4: Unification → Generate → Deploy → Execute → Monitor → Validate ✓ +``` + +**Each phase MUST complete successfully before proceeding to the next phase.** + +This ensures data dependencies are satisfied: +- Ingestion creates source tables +- Hist-Union requires source tables +- Staging requires hist-union tables +- Unification requires staging tables + +--- + +## Required Inputs + +### Global Configuration (Required First) + +**TD API Credentials:** +- **TD_API_KEY**: Your Treasure Data API key (required for deployment & execution) + - Find at: https://console.treasuredata.com/app/users + - Format: `12345/abcdef1234567890abcdef1234567890abcdef12` + +- **TD_ENDPOINT**: Your TD regional endpoint + - **US**: `https://api.treasuredata.com` (default) + - **EU**: `https://api-cdp.eu01.treasuredata.com` + - **Tokyo**: `https://api-cdp.treasuredata.co.jp` + - **Asia Pacific**: `https://api-cdp.ap02.treasuredata.com` + +**Client Information:** +- **Client Name**: Your client identifier (e.g., `mck`, `acme`, `client_name`) + - Used for: Database naming, configuration, organization + +--- + +### Phase 1: Ingestion Configuration + +**Data Source:** +- **Source Name**: System name (e.g., `shopify`, `salesforce`, `klaviyo`) +- **Connector Type**: TD connector (e.g., `rest`, `salesforce`, `bigquery`) +- **Objects/Tables**: Comma-separated list (e.g., `orders,customers,products`) + +**Ingestion Settings:** +- **Mode**: Choose one: + - `incremental` - Ongoing data sync only + - `historical` - One-time historical backfill only + - `both` - Separate historical and incremental workflows (recommended) + +- **Incremental Field**: Field for updates (e.g., `updated_at`, `modified_date`) +- **Default Start Date**: Initial load date (format: `2023-09-01T00:00:00.000000`) + +**Target:** +- **Target Database**: Default: `{client}_src` +- **Project Directory**: Default: `ingestion` + +**Authentication:** +- Source system credentials (will be configured during ingestion setup) + +--- + +### Phase 2: Hist-Union Configuration + +**Tables to Combine:** +- List of tables requiring historical + incremental merge +- Format: `database.table_name` or `table_name` (uses default database) +- Example: `shopify_orders`, `shopify_customers` + +**Settings:** +- **Project Directory**: Default: `hist_union` +- **Source Database**: From Phase 1 output (default: `{client}_src`) +- **Target Database**: Default: `{client}_src` (overwrites with combined data) + +--- + +### Phase 3: Staging Configuration + +**Transformation Settings:** +- **SQL Engine**: Choose one: + - `presto` - Presto SQL (recommended for most cases) + - `hive` - Hive SQL (for legacy compatibility) + +**Tables to Transform:** +- Tables from hist-union output +- Will apply data cleaning, standardization, PII handling + +**Settings:** +- **Project Directory**: Default: `staging` +- **Source Database**: From Phase 2 output +- **Target Database**: Default: `{client}_stg` + +--- + +### Phase 4: Unification Configuration + +**Unification Settings:** +- **Unification Name**: Project name (e.g., `customer_360`, `unified_profile`) +- **ID Method**: Choose one: + - `persistent_id` - Stable IDs across updates (recommended) + - `canonical_id` - Traditional merge approach + +- **Update Strategy**: Choose one: + - `incremental` - Process new/updated records only (recommended) + - `full` - Reprocess all data each time + +**Tables for Unification:** +- Staging tables with user identifiers +- Format: `database.table_name` +- Example: `acme_stg.shopify_orders`, `acme_stg.shopify_customers` + +**Settings:** +- **Project Directory**: Default: `unification` +- **Regional Endpoint**: Same as Global TD_ENDPOINT + +--- + +## Execution Process + +### Step-by-Step Flow + +I'll launch the **cdp-pipeline-orchestrator agent** to execute: + +**1. Configuration Collection** +- Gather all inputs upfront +- Validate credentials and access +- Create execution plan +- Show complete configuration for approval + +**2. Phase Execution Loop** + +For each phase (Ingestion → Hist-Union → Staging → Unification): + +``` +┌─────────── PHASE EXECUTION ───────────┐ +│ │ +│ [1] GENERATE Workflows │ +│ → Invoke plugin slash command │ +│ → Wait for file generation │ +│ → Verify files created │ +│ │ +│ [2] DEPLOY Workflows │ +│ → Navigate to project directory │ +│ → Execute: td wf push │ +│ → Parse deployment result │ +│ → Auto-fix errors if possible │ +│ → Retry up to 3 times │ +│ │ +│ [3] EXECUTE Workflows │ +│ → Execute: td wf start │ +│ → Capture session ID │ +│ → Log start time │ +│ │ +│ [4] MONITOR Execution │ +│ → Poll status every 30 seconds │ +│ → Show real-time progress │ +│ → Calculate elapsed time │ +│ → Wait for completion │ +│ │ +│ [5] VALIDATE Output │ +│ → Query TD for created tables │ +│ → Verify row counts > 0 │ +│ → Check schema expectations │ +│ │ +│ [6] PROCEED to Next Phase │ +│ → Only if validation passes │ +│ → Update progress tracker │ +│ │ +└───────────────────────────────────────┘ +``` + +**3. Final Report** +- Complete execution summary +- All files generated +- Deployment records +- Data quality metrics +- Next steps guidance + +--- + +## What Happens in Each Step + +### [1] GENERATE Workflows + +**Tool**: SlashCommand + +**Actions**: +``` +Phase 1 → /cdp-ingestion:ingest-new +Phase 2 → /cdp-histunion:histunion-batch +Phase 3 → /cdp-staging:transform-batch +Phase 4 → /cdp-unification:unify-setup +``` + +**Output**: Complete workflow files (.dig, .yml, .sql) + +**Verification**: Check files exist using Glob tool + +--- + +### [2] DEPLOY Workflows + +**Tool**: Bash + +**Command**: +```bash +cd {project_directory} +td -k {TD_API_KEY} -e {TD_ENDPOINT} wf push {project_name} +``` + +**Success Indicators**: +- "Uploaded workflows" +- "Project uploaded" +- No error messages + +**Error Handling**: + +**Syntax Error**: +``` +Error: syntax error in shopify_ingest_inc.dig:15 +→ Read file to identify issue +→ Fix using Edit tool +→ Retry deployment +``` + +**Validation Error**: +``` +Error: database 'acme_src' not found +→ Check database exists +→ Create if needed +→ Update configuration +→ Retry deployment +``` + +**Authentication Error**: +``` +Error: authentication failed +→ Verify TD_API_KEY +→ Check endpoint URL +→ Ask user to provide correct credentials +→ Retry deployment +``` + +**Retry Logic**: Up to 3 attempts with auto-fixes + +--- + +### [3] EXECUTE Workflows + +**Tool**: Bash + +**Command**: +```bash +td -k {TD_API_KEY} -e {TD_ENDPOINT} wf start {project} {workflow} --session now +``` + +**Output Parsing**: +``` +session id: 123456789 +attempt id: 987654321 +``` + +**Captured**: +- Session ID (for monitoring) +- Attempt ID +- Start timestamp + +--- + +### [4] MONITOR Execution + +**Tool**: Bash (polling loop) + +**Pattern**: +```bash +# Check 1 (immediately) +td -k {TD_API_KEY} -e {TD_ENDPOINT} wf session {session_id} +# Output: {"status": "running"} +# → Show: ⏳ Status: running (0:00 elapsed) + +# Wait 30 seconds +sleep 30 + +# Check 2 (after 30s) +td -k {TD_API_KEY} -e {TD_ENDPOINT} wf session {session_id} +# Output: {"status": "running"} +# → Show: ⏳ Status: running (0:30 elapsed) + +# Continue until status changes... + +# Final check +td -k {TD_API_KEY} -e {TD_ENDPOINT} wf session {session_id} +# Output: {"status": "success"} +# → Show: ✓ Execution completed (15:30 elapsed) +``` + +**Status Handling**: + +**Status: running** +- Continue polling +- Show progress indicator +- Wait 30 seconds + +**Status: success** +- Show completion message +- Log final duration +- Proceed to validation + +**Status: error** +- Retrieve logs: `td wf log {session_id}` +- Parse error message +- Show to user +- Ask: Retry / Fix / Skip / Abort + +**Status: killed** +- Show killed message +- Ask user: Restart / Abort + +**Maximum Wait**: 2 hours (240 checks) + +--- + +### [5] VALIDATE Output + +**Tool**: mcp__mcc_treasuredata__query + +**Validation Queries**: + +**Ingestion Phase**: +```sql +-- Check tables created +SELECT table_name, row_count +FROM information_schema.tables +WHERE database_name = '{target_database}' + AND table_name LIKE '{source_name}%' + +-- Expected: {source}_orders, {source}_customers, etc. +-- Verify: row_count > 0 for each table +``` + +**Hist-Union Phase**: +```sql +-- Check hist-union tables +SELECT table_name, row_count +FROM information_schema.tables +WHERE database_name = '{target_database}' + AND table_name LIKE '%_hist_union' + +-- Verify: row_count >= source table counts +``` + +**Staging Phase**: +```sql +-- Check staging tables +SELECT table_name, row_count +FROM information_schema.tables +WHERE database_name = '{target_database}' + AND table_name LIKE '%_stg_%' + +-- Check for transformed columns +DESCRIBE {database}.{table_name} +``` + +**Unification Phase**: +```sql +-- Check unification tables +SELECT table_name, row_count +FROM information_schema.tables +WHERE database_name = '{target_database}' + AND (table_name LIKE '%_prep' + OR table_name LIKE '%unified_id%' + OR table_name LIKE '%enriched%') + +-- Verify: unified_id_lookup exists +-- Verify: enriched tables have unified_id column +``` + +**Validation Results**: +- ✓ **PASS**: All tables found with data → Proceed +- ⚠ **WARN**: Tables found but empty → Ask user +- ✗ **FAIL**: Tables missing → Stop, show error + +--- + +## Error Handling + +### Deployment Errors + +**Common Issues & Auto-Fixes**: + +**1. YAML Syntax Error** +``` +Error: syntax error at line 15: missing colon +→ Auto-fix: Add missing colon +→ Retry: Automatic +``` + +**2. Missing Database** +``` +Error: database 'acme_src' does not exist +→ Check: Query information_schema +→ Create: If user approves +→ Retry: Automatic +``` + +**3. Secret Not Found** +``` +Error: secret 'shopify_api_key' not found +→ Prompt: User to upload credentials +→ Wait: For user confirmation +→ Retry: After user uploads +``` + +**Retry Strategy**: +- Attempt 1: Auto-fix if possible +- Attempt 2: Ask user for input, fix, retry +- Attempt 3: Final attempt after user guidance +- After 3 failures: Ask user to Skip/Abort + +--- + +### Execution Errors + +**Common Issues**: + +**1. Table Not Found** +``` +Error: Table 'acme_src.shopify_orders' does not exist + +Diagnosis: +- Previous phase (Ingestion) may have failed +- Table name mismatch in configuration +- Database permissions issue + +Options: +1. Retry - Run workflow again +2. Check Previous Phase - Verify ingestion completed +3. Skip - Skip this phase (NOT RECOMMENDED) +4. Abort - Stop entire pipeline + +Your choice: +``` + +**2. Query Timeout** +``` +Error: Query exceeded timeout limit + +Diagnosis: +- Data volume too large +- Query not optimized +- Warehouse too small + +Options: +1. Retry - Attempt again +2. Increase Timeout - Update workflow configuration +3. Abort - Stop for investigation + +Your choice: +``` + +**3. Authentication Failed** +``` +Error: Authentication failed for data source + +Diagnosis: +- Credentials expired +- Invalid API key +- Permissions changed + +Options: +1. Update Credentials - Upload new credentials +2. Retry - Try again with existing credentials +3. Abort - Stop for manual fix + +Your choice: +``` + +--- + +### User Decision Points + +At each failure, I'll present: + +``` +┌─────────────────────────────────────────┐ +│ ⚠ Phase X Failed │ +├─────────────────────────────────────────┤ +│ Workflow: {workflow_name} │ +│ Session ID: {session_id} │ +│ Error: {error_message} │ +│ │ +│ Possible Causes: │ +│ 1. {cause_1} │ +│ 2. {cause_2} │ +│ 3. {cause_3} │ +│ │ +│ Options: │ +│ 1. Retry - Run again with same config │ +│ 2. Fix - Let me help fix the issue │ +│ 3. Skip - Skip this phase (DANGEROUS) │ +│ 4. Abort - Stop entire pipeline │ +│ │ +│ Your choice (1-4): │ +└─────────────────────────────────────────┘ +``` + +**Choice Handling**: +- **1 (Retry)**: Re-execute workflow immediately +- **2 (Fix)**: Interactive troubleshooting, then retry +- **3 (Skip)**: Warn about consequences, skip if confirmed +- **4 (Abort)**: Stop pipeline, generate partial report + +--- + +## Progress Tracking + +I'll use TodoWrite to show real-time progress: + +``` +✓ Pre-Flight: Configuration gathered +→ Phase 1: Ingestion + ✓ Generate workflows + ✓ Deploy workflows + → Execute workflows (session: 123456789) + ⏳ Monitor execution... (5:30 elapsed) + ⏳ Validate output... +□ Phase 2: Hist-Union +□ Phase 3: Staging +□ Phase 4: Unification +□ Final: Report generation +``` + +**Status Indicators**: +- ✓ Completed +- → In Progress +- ⏳ Waiting/Monitoring +- □ Pending +- ✗ Failed + +--- + +## Expected Timeline + +**Typical Execution Times** (varies by data volume): + +| Phase | Generation | Deployment | Execution | Validation | Total | +|-------|-----------|------------|-----------|------------|-------| +| **Ingestion** | 2-5 min | 30 sec | 15-60 min | 1 min | ~1 hour | +| **Hist-Union** | 1-2 min | 30 sec | 10-30 min | 1 min | ~30 min | +| **Staging** | 2-5 min | 30 sec | 20-45 min | 1 min | ~45 min | +| **Unification** | 3-5 min | 30 sec | 30-90 min | 2 min | ~1.5 hours | +| **TOTAL** | **~10 min** | **~2 min** | **~2-3 hours** | **~5 min** | **~3-4 hours** | + +*Actual times depend on data volume, complexity, and TD warehouse size* + +--- + +## Final Deliverables + +Upon successful completion, you'll receive: + +### 1. Generated Workflow Files + +**Ingestion** (`ingestion/`): +- `{source}_ingest_inc.dig` - Incremental ingestion workflow +- `{source}_ingest_hist.dig` - Historical backfill workflow (if mode=both) +- `config/{source}_datasources.yml` - Datasource configuration +- `config/{source}_{object}_load.yml` - Per-object load configs + +**Hist-Union** (`hist_union/`): +- `{source}_hist_union.dig` - Main hist-union workflow +- `queries/{table}_union.sql` - Union SQL per table + +**Staging** (`staging/`): +- `transform_{source}.dig` - Transformation workflow +- `queries/{table}_transform.sql` - Transform SQL per table + +**Unification** (`unification/`): +- `unif_runner.dig` - Main orchestration workflow +- `dynmic_prep_creation.dig` - Prep table creation +- `id_unification.dig` - ID unification via API +- `enrich_runner.dig` - Enrichment workflow +- `config/unify.yml` - Unification configuration +- `config/environment.yml` - Client environment +- `config/src_prep_params.yml` - Prep parameters +- `config/stage_enrich.yml` - Enrichment config +- `queries/` - 15+ SQL query files + +### 2. Deployment Records + +- Session IDs for all workflow executions +- Execution logs saved to `pipeline_logs/{date}/` +- Deployment timestamps +- Error logs (if any) + +### 3. Data Quality Metrics + +- Row counts per table +- ID unification statistics +- Coverage percentages +- Data quality scores + +### 4. Documentation + +- Complete configuration summary +- Deployment instructions +- Operational runbook +- Monitoring guidelines +- Troubleshooting playbook + +--- + +## Prerequisites + +**Before starting, ensure**: + +✅ **TD Toolbelt Installed**: +```bash +# Check version +td --version + +# If not installed: +# macOS: brew install td +# Linux: https://toolbelt.treasuredata.com/ +``` + +✅ **Valid TD API Key**: +- Get from: https://console.treasuredata.com/app/users +- Requires write permissions +- Should not expire during execution + +✅ **Network Access**: +- Can reach TD endpoint +- Can connect to source systems (for ingestion) + +✅ **Sufficient Permissions**: +- Create databases +- Create tables +- Upload workflows +- Execute workflows +- Upload secrets + +✅ **Source System Credentials**: +- API keys ready +- OAuth tokens current +- Service accounts configured + +--- + +## Getting Started + +**Ready to begin?** + +Please provide the following information: + +### Step 1: Global Configuration +``` +TD_API_KEY: 12345/abcd... +TD_ENDPOINT: https://api.treasuredata.com +Client Name: acme +``` + +### Step 2: Ingestion Details +``` +Source Name: shopify +Connector Type: rest +Objects: orders,customers,products +Mode: both +Incremental Field: updated_at +Start Date: 2023-09-01T00:00:00.000000 +Target Database: acme_src +``` + +### Step 3: Hist-Union Details +``` +Tables: shopify_orders,shopify_customers,shopify_products +``` + +### Step 4: Staging Details +``` +SQL Engine: presto +Tables: (will use hist-union output) +Target Database: acme_stg +``` + +### Step 5: Unification Details +``` +Unification Name: customer_360 +ID Method: persistent_id +Update Strategy: incremental +Tables: (will use staging output) +``` + +--- + +**Alternatively, provide all at once in YAML format:** + +```yaml +global: + td_api_key: "12345/abcd..." + td_endpoint: "https://api.treasuredata.com" + client: "acme" + +ingestion: + source_name: "shopify" + connector: "rest" + objects: ["orders", "customers", "products"] + mode: "both" + incremental_field: "updated_at" + start_date: "2023-09-01T00:00:00.000000" + target_database: "acme_src" + +hist_union: + tables: ["shopify_orders", "shopify_customers", "shopify_products"] + +staging: + engine: "presto" + target_database: "acme_stg" + +unification: + name: "customer_360" + id_method: "persistent_id" + update_strategy: "incremental" +``` + +--- + +**I'll orchestrate the complete CDP implementation from start to finish!** diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..8841ba0 --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,49 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:treasure-data/aps_claude_tools:plugins/cdp-orchestrator", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "9f3117db76f55a70b8e5c21a8befff386a0bf0b3", + "treeHash": "5569ae48d525d7dce85a19650e2c088d7a0a25e9572651378705593a7deef92e", + "generatedAt": "2025-11-28T10:28:45.160350Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "cdp-orchestrator", + "description": "End-to-end CDP implementation orchestrator with automated workflow generation, deployment, execution, and monitoring. Orchestrates Ingestion → Hist-Union → Staging → Unification pipeline with TD Toolbelt integration", + "version": null + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "934fb389c75602c638f7bedfb2c4e4c4e145cc414c1363bfcf49e3df521dcf19" + }, + { + "path": "agents/cdp-pipeline-orchestrator.md", + "sha256": "b4693549229099e1acc2b059678a54f240a10eb335c8f40da4783e8a30d906d8" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "db47817628aa44f0d9d14c76cb5c305cfd62825c558befec9ba6e95849ef7c7e" + }, + { + "path": "commands/cdp-implement.md", + "sha256": "296da8a054cb1a0d24067b8c5310abe810bbcc70aa310dcf0115c1a06f1e04eb" + } + ], + "dirSha256": "5569ae48d525d7dce85a19650e2c088d7a0a25e9572651378705593a7deef92e" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file