From 73652b90f9ae5255871452d1b21c0ae94637fc65 Mon Sep 17 00:00:00 2001 From: Zhongwei Li Date: Sun, 30 Nov 2025 09:03:06 +0800 Subject: [PATCH] Initial commit --- .claude-plugin/plugin.json | 14 + README.md | 3 + plugin.lock.json | 60 ++ skills/dbt/SKILL.md | 817 ++++++++++++++ skills/dbt/macros/override_dbt_trino.sql | 163 +++ skills/dbt/macros/td_incremental_scan.sql | 17 + skills/digdag/SKILL.md | 1194 +++++++++++++++++++++ skills/workflow-management/SKILL.md | 526 +++++++++ 8 files changed, 2794 insertions(+) create mode 100644 .claude-plugin/plugin.json create mode 100644 README.md create mode 100644 plugin.lock.json create mode 100644 skills/dbt/SKILL.md create mode 100644 skills/dbt/macros/override_dbt_trino.sql create mode 100644 skills/dbt/macros/td_incremental_scan.sql create mode 100644 skills/digdag/SKILL.md create mode 100644 skills/workflow-management/SKILL.md diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json new file mode 100644 index 0000000..7a0d986 --- /dev/null +++ b/.claude-plugin/plugin.json @@ -0,0 +1,14 @@ +{ + "name": "workflow-skills", + "description": "Treasure Workflow orchestration and data transformation skills including workflow creation, management, debugging, optimization, and dbt (data build tool) for Treasure Data Trino", + "version": "0.0.0-2025.11.28", + "author": { + "name": "Treasure Data", + "email": "support@treasuredata.com" + }, + "skills": [ + "./skills/digdag", + "./skills/workflow-management", + "./skills/dbt" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..34a8b39 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# workflow-skills + +Treasure Workflow orchestration and data transformation skills including workflow creation, management, debugging, optimization, and dbt (data build tool) for Treasure Data Trino diff --git a/plugin.lock.json b/plugin.lock.json new file mode 100644 index 0000000..9f3a133 --- /dev/null +++ b/plugin.lock.json @@ -0,0 +1,60 @@ +{ + "$schema": "internal://schemas/plugin.lock.v1.json", + "pluginId": "gh:treasure-data/td-skills:workflow-skills", + "normalized": { + "repo": null, + "ref": "refs/tags/v20251128.0", + "commit": "58a6893269fc962dab6eb96be0edc6e1bb313d4c", + "treeHash": "b79d9b2bd77e12d7d27ec31608cd5d958efc9a38947dfd031744dc83adbfc261", + "generatedAt": "2025-11-28T10:28:46.206672Z", + "toolVersion": "publish_plugins.py@0.2.0" + }, + "origin": { + "remote": "git@github.com:zhongweili/42plugin-data.git", + "branch": "master", + "commit": "aa1497ed0949fd50e99e70d6324a29c5b34f9390", + "repoRoot": "/Users/zhongweili/projects/openmind/42plugin-data" + }, + "manifest": { + "name": "workflow-skills", + "description": "Treasure Workflow orchestration and data transformation skills including workflow creation, management, debugging, optimization, and dbt (data build tool) for Treasure Data Trino" + }, + "content": { + "files": [ + { + "path": "README.md", + "sha256": "d1b3766bb6e8c36b488ee268025e7df332426c6601ce4e54d8ff988cbc4bc47c" + }, + { + "path": ".claude-plugin/plugin.json", + "sha256": "4f8ecba973462ff56d1314fce40dc07c0fee693dd5f08534845c739fcd20d97e" + }, + { + "path": "skills/dbt/SKILL.md", + "sha256": "be47fbfb69076da8ae38168e3cd83041d1c1f5a2420d65fd9b64848b719f28ca" + }, + { + "path": "skills/dbt/macros/td_incremental_scan.sql", + "sha256": "e4c7590e0356a22f2d6869965b61b4092b6dcd820a96416815d58eb5e06fe504" + }, + { + "path": "skills/dbt/macros/override_dbt_trino.sql", + "sha256": "7a14843c36bc0bbc7dd2fc1d994faaeca41ef93cd08461b7b2ff52c89b5e6056" + }, + { + "path": "skills/digdag/SKILL.md", + "sha256": "6200a49fc93268086792cbbcc0136ed11f8d9b285bbcc1f452a0624ae1bc8895" + }, + { + "path": "skills/workflow-management/SKILL.md", + "sha256": "3568c5bfbc46e7811a5c018ecbb91e4dd0171882f1dfa95b8f98aecaf232354f" + } + ], + "dirSha256": "b79d9b2bd77e12d7d27ec31608cd5d958efc9a38947dfd031744dc83adbfc261" + }, + "security": { + "scannedAt": null, + "scannerVersion": null, + "flags": [] + } +} \ No newline at end of file diff --git a/skills/dbt/SKILL.md b/skills/dbt/SKILL.md new file mode 100644 index 0000000..076d67e --- /dev/null +++ b/skills/dbt/SKILL.md @@ -0,0 +1,817 @@ +--- +name: dbt +description: Expert guidance for using dbt (data build tool) with Treasure Data Trino. Use this skill when users need help setting up dbt with TD, creating models, using TD-specific macros, handling incremental models, or troubleshooting dbt-trino adapter issues. +--- + +# dbt with Treasure Data Trino + +Expert assistance for using dbt (data build tool) with Treasure Data's Trino engine. + +## When to Use This Skill + +Use this skill when: +- Setting up dbt with Treasure Data Trino +- Creating dbt models for TD +- Writing TD-specific dbt macros +- Implementing incremental models with TD_INTERVAL +- Troubleshooting dbt-trino adapter errors +- Overriding dbt-trino macros for TD compatibility +- Managing dbt projects with TD data pipelines + +## Prerequisites + +### Installation + +**Recommended: Using uv (modern Python package manager):** + +`uv` is a fast, modern Python package and environment manager written in Rust. It's significantly faster than traditional pip and provides better dependency resolution. + +```bash +# Install uv (choose one): +# Option 1: Homebrew (recommended for Mac) +brew install uv + +# Option 2: Standalone installer +curl -LsSf https://astral.sh/uv/install.sh | sh + +# Create and activate virtual environment with uv +uv venv +source .venv/bin/activate # On Windows: .venv\Scripts\activate + +# Install dbt-core and dbt-trino (much faster than pip) +uv pip install dbt-core dbt-trino==1.9.3 + +# Verify installation +dbt --version +``` + +**Benefits of uv:** +- **10-100x faster** than pip for package installation +- **Better dependency resolution** with clearer error messages +- **Drop-in replacement** for pip (use `uv pip` instead of `pip`) +- **Built-in virtual environment management** with `uv venv` + +**Alternative: Using traditional pip and venv:** +```bash +# Create virtual environment (recommended) +python3 -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Note: brew install dbt doesn't work well on Mac OS X +# Install dbt-core and dbt-trino +pip install dbt-core dbt-trino==1.9.3 + +# Verify installation +dbt --version +# Expected output: +# Core: 1.10.9 +# Plugins: trino: 1.9.3 +``` + +### TD Connection Setup + +Create `profiles.yml` (can be in `~/.dbt/profiles.yml` or at project root): + +```yaml +td: + target: dev + outputs: + dev: + type: trino + method: none # Use 'none' for API key authentication + user: "{{ env_var('TD_API_KEY') }}" # TD API key from environment variable + password: dummy # Password is not used with API key + host: api-presto.treasuredata.com + port: 443 + database: td # Always 'td' for Treasure Data + schema: your_dev_database # Your dev TD database (e.g., 'dev_analytics') + threads: 4 + http_scheme: https + session_properties: + query_max_run_time: 1h + + prod: + type: trino + method: none + user: "{{ env_var('TD_API_KEY') }}" + password: dummy + host: api-presto.treasuredata.com + port: 443 + database: td + schema: your_prod_database # Your prod TD database (e.g., 'production') + threads: 4 + http_scheme: https + session_properties: + query_max_run_time: 1h +``` + +**Important TD-specific settings:** +- `method`: Set to `none` for API key authentication (not `ldap`) +- `user`: Use TD API key from `TD_API_KEY` environment variable +- `password`: Set to `dummy` (not used with API key authentication) +- `host`: Always `api-presto.treasuredata.com` (even though it's actually Trino) +- `database`: Always set to `td` for Treasure Data +- `schema`: Set to your actual TD database name (what you see in TD Console) + +**Set up your TD API key:** +```bash +# Get your API key from TD Console: https://console.treasuredata.com/app/users +export TD_API_KEY="your_api_key_here" + +# Or add to your shell profile (~/.bashrc, ~/.zshrc, etc.) +echo 'export TD_API_KEY="your_api_key_here"' >> ~/.zshrc +``` + +**Switch between dev and prod:** +```bash +# Run against dev (default) +dbt run + +# Run against prod +dbt run --target prod +``` + +### dbt Project Configuration + +Create or update `dbt_project.yml` with TD-specific settings: + +```yaml +name: 'my_td_project' +version: '1.0.0' +config-version: 2 + +# This setting configures which "profile" dbt uses for this project. +profile: 'td' + +# These configurations specify where dbt should look for different types of files. +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_packages" + +# SSL certificate validation (required for TD) +flags: + require_certificate_validation: true + +# Global variable for default time range +vars: + target_range: '-3M/now' # Default: last 3 months to now + +# Model configuration with TD-specific settings +models: + my_td_project: + +materialized: table + +on_schema_change: "append_new_columns" # Auto-add new columns instead of failing + +views_enabled: false # TD doesn't support views (use tables) + + # Staging models + staging: + +materialized: table + +tags: ["staging"] + + # Marts models + marts: + +materialized: table + +tags: ["marts"] + + # Incremental models + incremental: + +materialized: incremental + +on_schema_change: "append_new_columns" + +tags: ["incremental"] +``` + +**Key TD-specific settings:** +- `flags.require_certificate_validation: true` - Required for SSL validation with TD +- `vars.target_range: '-3M/now'` - Default time range for all models using the variable +- `+on_schema_change: "append_new_columns"` - Automatically add new columns to existing tables (prevents rebuild on schema changes) +- `+views_enabled: false` - Explicitly disable views since TD doesn't support `CREATE VIEW` + +**Benefits:** +- **SSL security**: Ensures certificate validation for secure TD connections +- **Schema evolution**: New columns are added automatically without dropping tables +- **Default time window**: All models using `{{ var('target_range') }}` get sensible default +- **No views**: Prevents accidental view creation attempts + +## Required TD-Specific Overrides + +TD's Presto/Trino has limitations that require overriding some dbt-trino macros. You MUST create this file in your dbt project. + +### Create `macros/override_dbt_trino.sql` + +This file overrides dbt-trino macros to work with TD Presto/Trino limitations: + +**Key changes:** +1. Removes table ownership queries (TD doesn't support) +2. Simplifies catalog queries +3. Replaces `CREATE VIEW` with `CREATE TABLE` (TD doesn't support views) + +See the full macro file in [macros/override_dbt_trino.sql](./macros/override_dbt_trino.sql) in this skill directory. + +**Why this is needed:** +- TD Presto doesn't support `CREATE VIEW` statements +- TD doesn't expose table ownership information +- Some information_schema queries need simplification + +## TD-Specific dbt Macros + +### 1. Incremental Scan Macro + +For incremental models that process new data only: + +```sql +-- macros/td_incremental_scan.sql +{% macro incremental_scan(table_name) -%} +( + SELECT * FROM {{ table_name }} + WHERE TD_INTERVAL(time, '{{ var("target_range", "-3M/now") }}') +{% if is_incremental() -%} + AND time > {{ get_max_time(this.table) }} +{%- endif %} +) +{%- endmacro %} + +{% macro get_max_time(table_name) -%} + (SELECT MAX(time) FROM {{ table_name }}) +{%- endmacro %} +``` + +**Default behavior:** Scans last 3 months to now (`-3M/now`) if no `target_range` variable is provided. + +**Usage in model:** +```sql +-- models/incremental_events.sql +{{ + config( + materialized='incremental', + unique_key='event_id' + ) +}} + +SELECT + event_id, + user_id, + event_type, + time +FROM {{ incremental_scan('raw_events') }} +``` + +**Run with default (last 3 months):** +```bash +dbt run --select incremental_events +``` + +**Or override with specific range:** +```bash +# Yesterday only +dbt run --vars '{"target_range": "-1d"}' --select incremental_events + +# Last 7 days +dbt run --vars '{"target_range": "-7d/now"}' --select incremental_events + +# Specific date range +dbt run --vars '{"target_range": "2024-01-01/2024-01-31"}' --select incremental_events +``` + +**Note:** No need to create wrapper macros for TD time functions - they're already simple enough to use directly in your SQL. + +## dbt Model Patterns for TD + +### Basic Model + +```sql +-- models/daily_events.sql +{{ + config( + materialized='table' + ) +}} + +SELECT + TD_TIME_STRING(time, 'd!', 'JST') as date, + event_type, + COUNT(*) as event_count, + approx_distinct(user_id) as unique_users +FROM {{ source('raw', 'events') }} +WHERE TD_INTERVAL(time, '-30d', 'JST') +GROUP BY 1, 2 +``` + +### Incremental Model + +```sql +-- models/incremental_user_events.sql +{{ + config( + materialized='incremental', + unique_key='user_date_key' + ) +}} + +SELECT + CONCAT(CAST(user_id AS VARCHAR), '_', TD_TIME_STRING(time, 'd!', 'JST')) as user_date_key, + user_id, + TD_TIME_STRING(time, 'd!', 'JST') as date, + COUNT(*) as event_count +FROM {{ source('raw', 'events') }} +WHERE TD_INTERVAL(time, '{{ var('target_range', '-1d') }}', 'JST') +{% if is_incremental() %} + -- Only process data after last run + AND time > (SELECT MAX(time) FROM {{ this }}) +{% endif %} +GROUP BY 1, 2, 3 +``` + +### CTE (Common Table Expression) Pattern + +```sql +-- models/user_metrics.sql +{{ + config( + materialized='table' + ) +}} + +WITH events_filtered AS ( + SELECT * + FROM {{ source('raw', 'events') }} + WHERE TD_INTERVAL(time, '-7d', 'JST') +), + +user_sessions AS ( + SELECT + user_id, + TD_SESSIONIZE(time, 1800, user_id) as session_id, + MIN(time) as session_start, + MAX(time) as session_end + FROM events_filtered + GROUP BY user_id, session_id +) + +SELECT + user_id, + COUNT(DISTINCT session_id) as session_count, + AVG(session_end - session_start) as avg_session_duration +FROM user_sessions +GROUP BY user_id +``` + +## Sources Configuration + +Define TD tables as sources: + +```yaml +# models/sources.yml +version: 2 + +sources: + - name: raw + database: production + schema: default + tables: + - name: events + description: Raw event data from applications + columns: + - name: time + description: Event timestamp (Unix time) + - name: user_id + description: User identifier + - name: event_type + description: Type of event + + - name: users + description: User profile data +``` + +**Usage in models:** +```sql +SELECT * FROM {{ source('raw', 'events') }} +``` + +## Testing with TD + +### Schema Tests + +```yaml +# models/schema.yml +version: 2 + +models: + - name: daily_events + description: Daily event aggregations + columns: + - name: date + description: Event date + tests: + - not_null + - unique + + - name: event_count + description: Number of events + tests: + - not_null + - dbt_utils.expression_is_true: + expression: ">= 0" + + - name: unique_users + description: Unique user count (approximate) + tests: + - not_null +``` + +### Custom TD Tests + +```sql +-- tests/assert_positive_events.sql +-- Returns records that fail the test +SELECT * +FROM {{ ref('daily_events') }} +WHERE event_count < 0 +``` + +## Running dbt with TD + +### Basic Commands + +```bash +# Test connection +dbt debug + +# Run all models +dbt run + +# Run specific model +dbt run --select daily_events + +# Run with variables +dbt run --vars '{"target_range": "-7d"}' + +# Run tests +dbt test + +# Generate documentation +dbt docs generate +dbt docs serve +``` + +### Incremental Run Pattern + +```bash +# Daily incremental run +dbt run --select incremental_events --vars '{"target_range": "-1d"}' + +# Full refresh +dbt run --select incremental_events --full-refresh + +# Backfill specific date +dbt run --select incremental_events --vars '{"target_range": "2024-01-15"}' +``` + +## Common Issues and Solutions + +### Issue 1: "This connector does not support creating views" + +**Error:** +``` +TrinoUserError: This connector does not support creating views +``` + +**Solution:** +Add `macros/override_dbt_trino.sql` that overrides `trino__create_view_as` to use `CREATE TABLE` instead. + +### Issue 2: Catalog Query Failures + +**Error:** +``` +Database Error: Table ownership information not available +``` + +**Solution:** +Use the override macros that remove table ownership queries from catalog operations. + +### Issue 3: Connection Timeout + +**Error:** +``` +Connection timeout +``` + +**Solution:** +Increase session timeout in `profiles.yml` if needed (default is 1h): +```yaml +session_properties: + query_max_run_time: 2h # Increase if queries legitimately need more time +``` + +### Issue 4: Incremental Model Not Working + +**Problem:** +Incremental model processes all data every time. + +**Solution:** +Ensure unique_key is set and check incremental logic: +```sql +{{ + config( + materialized='incremental', + unique_key='event_id' -- Must be specified + ) +}} + +{% if is_incremental() %} + -- This block only runs on incremental runs + WHERE time > (SELECT MAX(time) FROM {{ this }}) +{% endif %} +``` + +### Issue 5: Variable Not Found + +**Error:** +``` +Compilation Error: Var 'target_range' is undefined +``` + +**Solution:** +Provide default value: +```sql +WHERE TD_INTERVAL(time, '{{ var('target_range', '-1d') }}', 'JST') +``` + +Or pass variable: +```bash +dbt run --vars '{"target_range": "-1d"}' +``` + +## Project Structure + +``` +dbt_project/ +├── dbt_project.yml # Project config with TD-specific settings +├── profiles.yml # Connection config (or in ~/.dbt/profiles.yml) +├── macros/ +│ ├── override_dbt_trino.sql # Required TD overrides +│ └── td_incremental_scan.sql # Optional: Incremental helper +├── models/ +│ ├── sources.yml # Source definitions +│ ├── schema.yml # Tests and documentation +│ ├── staging/ +│ │ └── stg_events.sql +│ └── marts/ +│ ├── daily_events.sql +│ └── user_metrics.sql +└── tests/ + └── assert_positive_events.sql +``` + +**Note:** `profiles.yml` can be placed either: +- At project root (recommended for TD Workflow deployments) +- In `~/.dbt/profiles.yml` (for local development) + +## Best Practices + +1. **Include time filters in all models** + - Use TD_INTERVAL or TD_TIME_RANGE directly + - Critical for performance on large tables + +2. **Use incremental models wisely** + - Good for append-only event data + - Requires careful unique_key selection + - Test thoroughly before production + +3. **Leverage sources** + - Define all TD tables as sources + - Enables lineage tracking + - Centralizes table documentation + +4. **Use variables for flexibility** + - Date ranges + - Environment-specific settings + - Makes models reusable + +5. **Test your models** + - Not null checks on key columns + - Unique checks on IDs + - Custom assertions for business logic + +6. **Document everything** + - Model descriptions + - Column descriptions + - Include TD-specific notes + +## Integration with TD Workflows + +### Running dbt with Custom Scripts (Recommended for TD Workflow) + +TD Workflow supports running dbt using Custom Scripts with Docker containers. This is the recommended approach for production deployments. + +**Create a Python wrapper (`dbt_wrapper.py`):** +```python +#!/usr/bin/env python3 +import sys +from dbt.cli.main import dbtRunner + +def run_dbt(command_args): + """Run dbt commands using dbtRunner""" + dbt = dbtRunner() + result = dbt.invoke(command_args) + + if not result.success: + sys.exit(1) + + return result + +if __name__ == "__main__": + # Get command from arguments (e.g., ['run', '--target', 'prod']) + command = sys.argv[1:] if len(sys.argv) > 1 else ['run'] + + print(f"Running dbt with command: {' '.join(command)}") + run_dbt(command) +``` + +**Create workflow file (`dbt_workflow.dig`):** +```yaml +timezone: Asia/Tokyo + +schedule: + daily>: 03:00:00 + +_export: + docker: + image: "treasuredata/customscript-python:3.12.11-td1" + + # Set TD API key from secrets + _env: + TD_API_KEY: ${secret:td.apikey} + ++setup: + py>: tasks.InstallPackages + ++dbt_run: + py>: dbt_wrapper.run_dbt + command_args: ['run', '--target', 'prod'] + ++dbt_test: + py>: dbt_wrapper.run_dbt + command_args: ['test'] +``` + +**Create package installer (`tasks.py`):** +```python +def InstallPackages(): + """Install dbt and dependencies at runtime""" + import subprocess + import sys + + packages = [ + 'dbt-core==1.10.9', + 'dbt-trino==1.9.3' + ] + + for package in packages: + subprocess.check_call([ + sys.executable, '-m', 'pip', 'install', package + ]) +``` + +**Deploy to TD Workflow:** +```bash +# 1. Clean dbt artifacts +dbt clean + +# 2. Push to TD Workflow +td workflow push my_dbt_project + +# 3. Set TD API key secret +td workflow secrets --project my_dbt_project --set td.apikey=YOUR_API_KEY + +# 4. Run from TD Console or trigger manually +td workflow start my_dbt_project dbt_workflow --session now +``` + +**Important notes:** +- Use Docker image: `treasuredata/customscript-python:3.12.11-td1` (latest stable image with Python 3.12) +- Install dependencies at runtime using `py>: tasks.InstallPackages` +- Store API key in TD secrets: `${secret:td.apikey}` +- Include your dbt project files (models, macros, profiles.yml, dbt_project.yml) + +### Local Digdag + dbt Integration (Development) + +For local development and testing: + +```yaml +# workflow.dig ++dbt_run: + sh>: dbt run --vars '{"target_range": "${session_date}"}' + ++dbt_test: + sh>: dbt test +``` + +### Scheduled dbt Runs + +```yaml +# daily_dbt_workflow.dig +timezone: Asia/Tokyo + +schedule: + daily>: 03:00:00 + +_export: + session_date: ${session_date} + ++run_incremental_models: + sh>: | + cd /path/to/dbt_project + dbt run --select tag:incremental --vars '{"target_range": "-1d"}' + ++run_tests: + sh>: | + cd /path/to/dbt_project + dbt test --select tag:incremental + ++notify_completion: + echo>: "dbt run completed for ${session_date}" +``` + +## Advanced Patterns + +### Dynamic Table Selection + +```sql +-- models/flexible_aggregation.sql +{{ + config( + materialized='table' + ) +}} + +{% set table_name = var('source_table', 'events') %} +{% set metric = var('metric', 'event_count') %} + +SELECT + TD_TIME_STRING(time, 'd!', 'JST') as date, + COUNT(*) as {{ metric }} +FROM {{ source('raw', table_name) }} +WHERE TD_INTERVAL(time, '{{ var('target_range', '-7d') }}', 'JST') +GROUP BY 1 +``` + +### Multi-Source Union + +```sql +-- models/unified_events.sql +{{ + config( + materialized='table' + ) +}} + +{% set sources = ['mobile_events', 'web_events', 'api_events'] %} + +{% for source in sources %} + SELECT + '{{ source }}' as source_type, + * + FROM {{ source('raw', source) }} + WHERE TD_INTERVAL(time, '-1d', 'JST') + {% if not loop.last %}UNION ALL{% endif %} +{% endfor %} +``` + +## Resources + +- dbt Documentation: https://docs.getdbt.com/ +- dbt-trino adapter: https://github.com/starburstdata/dbt-trino +- TD Query Engine: Use Trino-specific SQL +- TD Functions: TD_INTERVAL, TD_TIME_STRING, etc. + +## Migration from SQL Scripts to dbt + +If migrating existing TD SQL workflows to dbt: + +1. **Convert queries to models** + - Add config block + - Use source() for table references + - Add TD-specific macros + +2. **Add tests** + - Start with basic not_null tests + - Add unique key tests + - Create custom business logic tests + +3. **Implement incrementally** + - Start with simple table materializations + - Add incremental models gradually + - Test each model thoroughly + +4. **Update orchestration** + - Replace direct SQL in digdag with dbt commands + - Maintain existing schedules + - Add dbt test steps diff --git a/skills/dbt/macros/override_dbt_trino.sql b/skills/dbt/macros/override_dbt_trino.sql new file mode 100644 index 0000000..c7b78a6 --- /dev/null +++ b/skills/dbt/macros/override_dbt_trino.sql @@ -0,0 +1,163 @@ +-- Import from dbt-trino v1.7.1 +-- https://github.com/starburstdata/dbt-trino/blob/v1.7.1/dbt/include/trino/macros/catalog.sql +-- To remove unnecessary parts that cause errors with TD Presto +-- https://github.com/starburstdata/dbt-trino/blob/1.4.latest/dbt/include/trino/macros/catalog.sql#L40-L59 +-- https://github.com/starburstdata/dbt-trino/issues/298 + +{% macro trino__get_catalog(information_schema, schemas) -%} + + {% set query %} + with tables as ( + {{ trino__get_catalog_tables_sql(information_schema) }} + {{ trino__get_catalog_schemas_where_clause_sql(schemas) }} + ), + columns as ( + {{ trino__get_catalog_columns_sql(information_schema) }} + {{ trino__get_catalog_schemas_where_clause_sql(schemas) }} + ) + {{ trino__get_catalog_results_sql() }} + {%- endset -%} + + {{ return(run_query(query)) }} + +{%- endmacro %} + + +{% macro trino__get_catalog_relations(information_schema, relations) -%} + + {% set query %} + with tables as ( + {{ trino__get_catalog_tables_sql(information_schema) }} + {{ trino__get_catalog_relations_where_clause_sql(relations) }} + ), + columns as ( + {{ trino__get_catalog_columns_sql(information_schema) }} + {{ trino__get_catalog_relations_where_clause_sql(relations) }} + ) + {{ trino__get_catalog_results_sql() }} + {%- endset -%} + + {{ return(run_query(query)) }} + +{%- endmacro %} + + +{% macro trino__get_catalog_tables_sql(information_schema) -%} + select + table_catalog as "table_database", + table_schema as "table_schema", + table_name as "table_name", + table_type as "table_type", + null as "table_owner" + from {{ information_schema }}.tables +{%- endmacro %} + + +{% macro trino__get_catalog_columns_sql(information_schema) -%} + select + table_catalog as "table_database", + table_schema as "table_schema", + table_name as "table_name", + column_name as "column_name", + ordinal_position as "column_index", + data_type as "column_type", + comment as "column_comment" + from {{ information_schema }}.columns +{%- endmacro %} + + +{% macro trino__get_catalog_results_sql() -%} + select + table_database, + table_schema, + table_name, + table_type, + table_owner, + column_name, + column_index, + column_type, + column_comment + from tables + join columns using ("table_database", "table_schema", "table_name") + order by "column_index" +{%- endmacro %} + + +{% macro trino__get_catalog_schemas_where_clause_sql(schemas) -%} + where + table_schema != 'information_schema' + and + table_schema in ('{{ schemas | join("','") | lower }}') +{%- endmacro %} + + +{% macro trino__get_catalog_relations_where_clause_sql(relations) -%} + where + table_schema != 'information_schema' + and + ( + {%- for relation in relations -%} + {% if relation.schema and relation.identifier %} + ( + table_schema = '{{ relation.schema | lower }}' + and table_name = '{{ relation.identifier | lower }}' + ) + {% elif relation.schema %} + ( + table_schema = '{{ relation.schema | lower }}' + ) + {% else %} + {% do exceptions.raise_compiler_error( + '`get_catalog_relations` requires a list of relations, each with a schema' + ) %} + {% endif %} + + {%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) +{%- endmacro %} + + +-- - get_catalog +-- - list_relations_without_caching +-- - get_columns_in_relation + +-- Import from dbt-trino v1.1 +-- https://github.com/starburstdata/dbt-trino/blob/1.1.latest/dbt/include/trino/macros/adapters.sql +-- To remove unnecessary parts that cause errors with TD Presto +-- https://github.com/starburstdata/dbt-trino/blob/1.4.latest/dbt/include/trino/macros/adapters.sql#L29-L48 +-- https://github.com/starburstdata/dbt-trino/issues/298 +{% macro trino__list_relations_without_caching(relation) %} + {% call statement('list_relations_without_caching', fetch_result=True) -%} + select + table_catalog as database, + table_name as name, + table_schema as schema, + case when table_type = 'BASE TABLE' then 'table' + when table_type = 'VIEW' then 'view' + else table_type + end as table_type + from {{ relation.information_schema() }}.tables + where table_schema = '{{ relation.schema | lower }}' + {% endcall %} + {{ return(load_result('list_relations_without_caching').table) }} +{% endmacro %} + +-- Override dbt-trino "trino__create_view_as" macro with "create table if not exists" +-- https://github.com/starburstdata/dbt-trino/blob/1.4.latest/dbt/include/trino/macros/adapters.sql#L102-L115 +-- To void unsupported "create view" action with TD Presto +-- Database Error in model dbt_results (models/dbt_results.sql) +-- TrinoUserError(type=USER_ERROR, name=NOT_SUPPORTED, message="This connector does not support creating views") +{% macro trino__create_view_as(relation, sql) -%} + {%- set view_security = config.get('view_security', 'definer') -%} + {%- if view_security not in ['definer', 'invoker'] -%} + {%- set log_message = 'Invalid value for view_security (%s) specified. Setting default value (%s).' % (view_security, 'definer') -%} + {% do log(log_message) %} + {%- set on_table_exists = 'definer' -%} + {% endif %} + create table if not exists + {{ relation }} + as + {{ sql }} + ; +{% endmacro %} diff --git a/skills/dbt/macros/td_incremental_scan.sql b/skills/dbt/macros/td_incremental_scan.sql new file mode 100644 index 0000000..ae94ed3 --- /dev/null +++ b/skills/dbt/macros/td_incremental_scan.sql @@ -0,0 +1,17 @@ +-- TD Incremental Scan Macro +-- Scans a table for a specific time range and optionally filters for incremental processing +-- Default target_range is '-3M/now' (last 3 months to now) + +{% macro incremental_scan(table_name) -%} +( + SELECT * FROM {{ table_name }} + WHERE TD_INTERVAL(time, '{{ var("target_range", "-3M/now") }}') +{% if is_incremental() -%} + AND time > {{ get_max_time(this.table) }} +{%- endif %} +) +{%- endmacro %} + +{% macro get_max_time(table_name) -%} + (SELECT MAX(time) FROM {{ table_name }}) +{%- endmacro %} diff --git a/skills/digdag/SKILL.md b/skills/digdag/SKILL.md new file mode 100644 index 0000000..977b2ff --- /dev/null +++ b/skills/digdag/SKILL.md @@ -0,0 +1,1194 @@ +--- +name: digdag +description: Expert assistance for designing, implementing, and debugging Treasure Workflow (digdag) for Treasure Data. Use this skill when users need help with workflow YAML configuration, workflow orchestration, error handling, or scheduling. +--- + +# Treasure Workflow Expert + +Expert assistance for creating and managing Treasure Workflow (powered by digdag) in Treasure Data environments. + +## When to Use This Skill + +Use this skill when: +- Creating new Treasure Workflow definitions +- Debugging existing Treasure Workflows +- Implementing error handling and retry logic +- Setting up workflow schedules and dependencies +- Optimizing workflow performance +- Working with workflow operators (td, sh, py, etc.) + +## Core Principles + +### 1. Basic Workflow Structure + +A Treasure Workflow is defined in a `.dig` file. **The filename becomes the workflow name** - for example, `hello_world.dig` creates a "hello_world" workflow. + +```yaml +timezone: UTC + +schedule: + daily>: 02:00:00 + +_export: + td: + database: my_database + ++start: + echo>: "Workflow started at ${session_time}" + ++query: + td>: queries/my_query.sql + ++finish: + echo>: "Workflow completed" +``` + +**Key points:** +- `.dig` file extension required +- Workflow name = filename (without .dig) +- `timezone:` defaults to UTC if not specified +- Tasks are defined with `+` prefix and run top-to-bottom sequentially + +### 2. Workflow Configuration + +**Essential top-level configurations:** + +```yaml +# Set timezone for schedule +timezone: Asia/Tokyo + +# Schedule workflow execution +schedule: + daily>: 02:00:00 # Daily at 2 AM + # hourly>: 00:00 # Every hour + # cron>: "0 */4 * * *" # Every 4 hours + # weekly>: "Mon,00:00:00" # Every Monday + +# Export common parameters +_export: + td: + database: production_db + engine: presto # or hive + my_param: "value" +``` + +### 3. Task Structure + +Tasks are defined with `+task_name:` prefix. Tasks run sequentially from top to bottom and can be nested as children of other tasks. + +```yaml ++task1: + echo>: "This is task 1" + ++task2: + sh>: echo "This is task 2" + ++parent_task: + echo>: "Parent task" + + +child_task: + echo>: "This runs as a child of parent_task" + + +another_child: + echo>: "This also runs as a child" + ++task3: + td>: queries/query.sql + create_table: result_table +``` + +**Important:** The syntax `foo>: bar` is syntactic sugar for setting both `_type: foo` and `_command: bar` parameters. + +### 4. TD Operator + +The `td>` operator runs TD queries: + +**Inline SQL:** +```yaml ++analyze_events: + td>: + query: | + SELECT + TD_TIME_FORMAT(time, 'yyyy-MM-dd', 'JST') as date, + COUNT(*) as event_count + FROM events + WHERE TD_TIME_RANGE( + time, + '${session_date_compact}', + TD_TIME_ADD('${session_date_compact}', '1d') + ) + GROUP BY 1 + database: analytics + engine: presto + create_table: daily_events +``` + +**External SQL file:** +```yaml ++run_query: + td>: queries/analysis.sql + database: analytics + engine: presto + create_table: analysis_results + + # Or insert into existing table + # insert_into: existing_table + + # Or download results + # download_file: results.csv +``` + +### 5. Session Variables + +Digdag provides built-in session variables for dynamic workflows accessible via `${variable_name}` syntax: + +**Always available variables:** + +| Variable | Description | Example | +|----------|-------------|---------| +| `${timezone}` | Timezone of workflow | `America/Los_Angeles` | +| `${project_id}` | Project ID | `12345` | +| `${session_uuid}` | Unique session UUID | `414a8b9e-b365-4394-916a-f0ed9987bd2b` | +| `${session_id}` | Integer session ID | `2381` | +| `${session_time}` | Session timestamp with timezone | `2016-01-30T00:00:00-08:00` | +| `${session_date}` | Date part (YYYY-MM-DD) | `2016-01-30` | +| `${session_date_compact}` | Date part (YYYYMMDD) | `20160130` | +| `${session_local_time}` | Local time format | `2016-01-30 00:00:00` | +| `${session_tz_offset}` | Timezone offset | `-0800` | +| `${session_unixtime}` | Unix epoch seconds | `1454140800` | +| `${task_name}` | Full task name path | `+myworkflow+parent+child` | +| `${attempt_id}` | Integer attempt ID | `7` | + +**Schedule-only variables** (when `schedule:` is configured): + +| Variable | Hourly Schedule Example | Daily Schedule Example | +|----------|------------------------|------------------------| +| `${last_session_time}` | `2016-01-29T23:00:00-08:00` | `2016-01-29T00:00:00-08:00` | +| `${last_session_date}` | `2016-01-29` | `2016-01-29` | +| `${last_session_date_compact}` | `20160129` | `20160129` | +| `${last_session_local_time}` | `2016-01-29 23:00:00` | `2016-01-29 00:00:00` | +| `${last_session_tz_offset}` | `-0800` | `-0800` | +| `${last_session_unixtime}` | `1454137200` | `1454054400` | +| `${last_executed_session_time}` | `2016-01-29T23:00:00-08:00` | `2016-01-29T00:00:00-08:00` | +| `${last_executed_session_unixtime}` | `1454137200` | `1454054400` | +| `${next_session_time}` | `2016-01-30T01:00:00-08:00` | `2016-01-31T00:00:00-08:00` | +| `${next_session_date}` | `2016-01-30` | `2016-01-31` | +| `${next_session_date_compact}` | `20160130` | `20160131` | +| `${next_session_local_time}` | `2016-01-30 01:00:00` | `2016-01-31 00:00:00` | +| `${next_session_tz_offset}` | `-0800` | `-0800` | +| `${next_session_unixtime}` | `1454144400` | `1454227200` | + +**Notes:** +- `last_session_time` = timestamp of last schedule (calculated, not actual execution) +- `last_executed_session_time` = timestamp of previously executed session +- Built-in variables cannot be overwritten + +**Using variables in queries:** +```yaml ++daily_job: + td>: + query: | + SELECT * FROM events + WHERE TD_TIME_RANGE( + time, + '${session_date}', -- YYYY-MM-DD format + TD_TIME_ADD('${session_date}', '1d') + ) +``` + +**Calculating variables with JavaScript:** + +You can use JavaScript expressions within `${...}` syntax. Digdag includes Moment.js for time calculations: + +```yaml +timezone: America/Los_Angeles + ++format_session_time: + # "2016-09-24 00:00:00 -0700" + echo>: ${moment(session_time).format("YYYY-MM-DD HH:mm:ss Z")} + ++format_in_utc: + # "2016-09-24 07:00:00" + echo>: ${moment(session_time).utc().format("YYYY-MM-DD HH:mm:ss")} + ++format_tomorrow: + # "September 24, 2016 12:00 AM" + echo>: ${moment(session_time).add(1, 'days').format("LLL")} + ++get_execution_time: + # Current execution time: "2016-09-24 05:24:49 -0700" + echo>: ${moment().format("YYYY-MM-DD HH:mm:ss Z")} +``` + +### 6. Task Dependencies + +**Sequential execution (default):** +```yaml ++step1: + echo>: "First" + ++step2: + echo>: "Second (runs after step1)" +``` + +**Parallel execution:** + +Use `_parallel: true` to run child tasks concurrently (only affects direct children, not grandchildren): + +```yaml ++parallel_tasks: + _parallel: true + + +task_a: + echo>: "Runs in parallel" + + +task_b: + echo>: "Runs in parallel" + + +task_c: + echo>: "Runs in parallel" + ++after_parallel: + echo>: "Runs after all parallel tasks complete" +``` + +**Limited parallel execution:** + +Use `_parallel: {limit: N}` to limit concurrent execution to N tasks: + +```yaml ++prepare: + # +data1 and +data2 run in parallel first + # Then +data3 and +data4 run in parallel (after first two succeed) + _parallel: + limit: 2 + + +data1: + py>: tasks.PrepareWorkflow.prepare_data1 + + +data2: + py>: tasks.PrepareWorkflow.prepare_data2 + + +data3: + py>: tasks.PrepareWorkflow.prepare_data3 + + +data4: + py>: tasks.PrepareWorkflow.prepare_data4 + ++analyze: + py>: tasks.AnalyzeWorkflow.analyze_prepared_data_sets +``` + +**Background execution:** + +Use `_background: true` to run a task in parallel with previous tasks. The next task waits for background task completion: + +```yaml ++prepare: + +data1: + py>: tasks.PrepareWorkflow.prepare_data1 + + # +data1 and +data2 run in parallel + +data2: + _background: true + py>: tasks.PrepareWorkflow.prepare_data2 + + # +data3 runs after both +data1 and +data2 complete + +data3: + py>: tasks.PrepareWorkflow.prepare_data3 + ++analyze: + py>: tasks.AnalyzeWorkflow.analyze_prepared_data_sets +``` + +### 7. Error Handling + +**Group-level retry:** + +If `_retry: N` is set on a group, it retries the entire group from the beginning when any child fails: + +```yaml ++prepare: + # If any child task fails, retry the entire group up to 3 times + _retry: 3 + + +erase_table: + py>: tasks.PrepareWorkflow.erase_table + + +load_data: + py>: tasks.PrepareWorkflow.load_data + + +check_loaded_data: + py>: tasks.PrepareWorkflow.check_loaded_data + ++analyze: + py>: tasks.AnalyzeWorkflow.analyze_prepared_data_sets +``` + +**Task-level retry:** + +Individual tasks can also use `_retry: N`, though some operators have their own retry options: + +```yaml ++query_with_retry: + td>: queries/important_query.sql + _retry: 3 +``` + +**Retry with intervals:** + +Configure retry intervals with exponential or constant backoff: + +```yaml ++prepare: + _retry: + limit: 3 # Number of retries + interval: 10 # Interval in seconds + interval_type: exponential # or "constant" + + +load_data: + py>: tasks.PrepareWorkflow.load_data +``` + +With `exponential` type: +- 1st retry: 10 seconds +- 2nd retry: 20 seconds (10 × 2^1) +- 3rd retry: 40 seconds (10 × 2^2) + +With `constant` type (default): +- All retries: 10 seconds + +**Error handling tasks:** + +Use `_error:` to run operators when a workflow fails: + +```yaml +# Runs when workflow fails +_error: + py>: tasks.ErrorWorkflow.runs_when_workflow_failed + ++main_task: + td>: queries/analysis.sql + + _error: + +send_alert: + sh>: python scripts/send_slack_alert.py "Main task failed" + ++another_task: + py>: tasks.process_data +``` + +**Email notifications on error:** +```yaml +_error: + mail>: + from: workflow@example.com + to: [alerts@example.com] + subject: "Workflow ${task_name} failed" + body: "Session: ${session_time}" +``` + +### 8. Defining Variables + +**Using `_export:` directive:** + +The `_export:` directive defines variables within a scope. Variables are available to the task and all its children: + +```yaml +_export: + foo: 1 # Available to all tasks + ++prepare: + py>: tasks.MyWorkflow.prepare + # Can use ${foo} + ++analyze: + _export: + bar: 2 # Only available to +analyze and its children + + +step1: + py>: tasks.MyWorkflow.analyze_step1 + # Can use ${foo} and ${bar} + ++dump: + py>: tasks.MyWorkflow.dump + # Can use ${foo}, but NOT ${bar} +``` + +**Key points:** +- Top-level `_export:` makes variables available to all tasks +- Task-level `_export:` makes variables available to that task and its children only +- Built-in variables cannot be overwritten + +**Using API (Python):** + +Variables can be set programmatically using language APIs: + +```python +import digdag + +class MyWorkflow(object): + def prepare(self): + # store() makes variables available to ALL following tasks + digdag.env.store({"my_param": 2}) + + def export_and_call_child(self): + # export() makes variables available to children only + digdag.env.export({"my_param": 2}) + digdag.env.add_subtask({'_type': 'call', '_command': 'child1.dig'}) +``` + +**Differences:** +- `digdag.env.store(dict)` → Available to all following tasks (like task-level variable) +- `digdag.env.export(dict)` → Available to children only (like `_export:` in YAML) + +**Parameter Store (secrets):** + +Store and retrieve secrets securely from TD parameter store: + +```yaml +_export: + # Reference secrets from TD parameter store + api_key: ${secret:api_credentials.api_key} + db_password: ${secret:database.password} + ++call_api: + py>: scripts.api_caller.main + api_key: ${api_key} +``` + +### 9. Including External Files + +Use `!include` to organize complex workflows across multiple files: + +```yaml +_export: + mysql: + !include : 'config/mysql.dig' + hive: + !include : 'config/hive.dig' + +!include : 'tasks/foo.dig' +``` + +**Note:** A whitespace before `:` is required for valid YAML syntax. + +**Example structure:** +``` +my_workflow/ +├── my_workflow.dig +├── config/ +│ ├── mysql.dig +│ └── hive.dig +└── tasks/ + └── foo.dig +``` + +**config/mysql.dig:** +```yaml +host: mysql.example.com +port: 3306 +database: production +``` + +**tasks/foo.dig:** +```yaml ++extract: + td>: queries/extract.sql + ++transform: + td>: queries/transform.sql +``` + +### 10. Python Operator + +Run Python scripts: + +```yaml ++python_task: + py>: scripts.data_processor.process + database: ${td.database} + table: events + + # Can pass parameters + _env: + TD_API_KEY: ${secret:td.apikey} +``` + +Python script structure (`scripts/data_processor.py`): +```python +def process(database, table): + """ + Process data from TD table + """ + import pytd + + client = pytd.Client(database=database) + df = client.query(f"SELECT * FROM {table} LIMIT 1000") + + # Process data + result = df.shape[0] + + # Return value to use in workflow + return {'processed_count': result} +``` + +### 11. Shell Operator + +Run shell commands: + +```yaml ++shell_task: + sh>: bash scripts/process_data.sh + ++inline_shell: + sh>: | + echo "Starting process" + date + echo "Process complete" +``` + +## Common Patterns + +### Daily ETL Pipeline + +```yaml +timezone: Asia/Tokyo + +schedule: + daily>: 03:00:00 + +_export: + td: + database: analytics + engine: presto + ++start: + echo>: "ETL started for ${session_date}" + ++extract: + td>: queries/extract_raw_events.sql + create_table: raw_events_${session_date_compact} + ++transform: + td>: queries/transform_events.sql + create_table: transformed_events_${session_date_compact} + ++load: + td>: + query: | + INSERT INTO events_daily + SELECT * FROM transformed_events_${session_date_compact} + ++cleanup: + td>: + query: | + DROP TABLE IF EXISTS raw_events_${session_date_compact}; + DROP TABLE IF EXISTS transformed_events_${session_date_compact}; + ++notify: + sh>: python scripts/send_completion_notification.py "${session_date}" + + _error: + +alert_failure: + sh>: python scripts/send_failure_alert.py "${session_date}" +``` + +### Parallel Data Processing + +```yaml +timezone: UTC + +schedule: + hourly>: 00:00 + +_export: + td: + database: events + engine: presto + ++process_regions: + _parallel: true + + +process_us: + td>: queries/process_region.sql + create_table: us_events_${session_date_compact} + region: 'US' + + +process_eu: + td>: queries/process_region.sql + create_table: eu_events_${session_date_compact} + region: 'EU' + + +process_asia: + td>: queries/process_region.sql + create_table: asia_events_${session_date_compact} + region: 'ASIA' + ++aggregate: + td>: + query: | + SELECT * FROM us_events_${session_date_compact} + UNION ALL + SELECT * FROM eu_events_${session_date_compact} + UNION ALL + SELECT * FROM asia_events_${session_date_compact} + create_table: global_events_${session_date_compact} +``` + +### Incremental Processing + +```yaml +timezone: Asia/Tokyo + +schedule: + daily>: 01:00:00 + +_export: + td: + database: analytics + engine: presto + ++get_last_processed: + td>: + query: | + SELECT MAX(processed_date) as last_date + FROM processing_log + store_last_results: true + ++process_new_data: + td>: + query: | + SELECT * + FROM source_table + WHERE TD_TIME_RANGE( + time, + COALESCE('${td.last_results.last_date}', '2024-01-01'), + '${session_date}' + ) + create_table: incremental_data + ++update_log: + td>: + query: | + INSERT INTO processing_log + VALUES ('${session_date}', current_timestamp) +``` + +### Multi-stage Pipeline with Dependencies + +```yaml +timezone: Asia/Tokyo + +_export: + td: + database: production + engine: presto + ++stage1_extract: + _parallel: true + + +extract_users: + td>: queries/extract_users.sql + create_table: staging_users + + +extract_events: + td>: queries/extract_events.sql + create_table: staging_events + ++stage2_join: + td>: queries/join_users_events.sql + create_table: user_events + + # This runs only after all stage1 tasks complete + ++stage3_aggregate: + _parallel: true + + +daily_stats: + td>: queries/daily_aggregation.sql + create_table: daily_stats + + +user_segments: + td>: queries/user_segmentation.sql + create_table: user_segments + ++stage4_export: + td>: queries/final_export.sql + download_file: export_${session_date_compact}.csv +``` + +## Workflow Project Structure + +A TD workflow project typically follows this structure: + +``` +my_workflow/ +├── workflow.dig # Main workflow definition +├── queries/ # SQL query files +│ ├── query1.sql +│ └── query2.sql +└── scripts/ # Python/shell scripts (optional) + └── process.py +``` + +**Key conventions:** +- Workflow files use `.dig` extension +- SQL queries are stored in `queries/` directory +- Python scripts go in `scripts/` directory +- Project name matches the workflow directory name + +## Treasure Workflow CLI Commands (tdx command) + +Treasure Workflow uses the `tdx` command-line tool for managing workflows. All workflow operations are performed using `tdx wf` (or `tdx workflow`). + +### Creating and Testing Workflows Locally + +**Push and run workflow:** +```bash +# Push workflow to TD (this registers and schedules if schedule: is defined) +tdx wf push my_workflow + +# Push from within workflow directory +cd my_workflow +tdx wf push . + +# Immediately run a workflow (returns attempt_id for monitoring) +tdx wf run my_project.my_workflow + +# Run with custom session time +tdx wf run my_project.my_workflow --session "2024-01-15T00:00:00+00:00" + +# The run command returns an attempt_id that you can use to monitor execution: +# Example output: "Started session attempt_id: 12345678" + +# Use attempt_id to check task status +tdx wf attempt 12345678 tasks + +# View logs for specific tasks +tdx wf attempt 12345678 logs +task_name +``` + +**Verify created tables:** +```bash +# List tables in database +tdx tables "database_name.*" + +# Show table schema +tdx describe database_name.table_name + +# Show table contents +tdx show database_name.table_name --limit 10 +``` + +### Pushing and Scheduling Workflows + +**Register workflow with TD:** +```bash +# Push workflow to TD (registers and schedules if schedule: is defined) +tdx wf push my_workflow + +# Push from within workflow directory +cd my_workflow +tdx wf push . + +# Push with custom revision name +tdx wf push my_workflow --revision v1.0.0 + +# Push with a different project name +tdx wf push my_workflow --name production_workflow +``` + +**Note:** When you push a workflow with a `schedule:` section, it automatically starts running on that schedule. + +### Managing Workflows + +**List registered workflows:** +```bash +# List all workflow projects +tdx wf projects + +# Filter projects by pattern +tdx wf projects "my_workflow_*" + +# Show workflows in a specific project +tdx wf workflows my_workflow + +# List all workflows across projects +tdx wf workflows +``` + +**View workflow execution history:** +```bash +# List workflow sessions (runs) +tdx wf sessions my_workflow + +# Filter sessions by status +tdx wf sessions my_workflow --status error +tdx wf sessions my_workflow --status running + +# Filter sessions by time range +tdx wf sessions --from "2024-01-01" --to "2024-01-31" + +# List workflow attempts +tdx wf attempts my_workflow + +# Show details of a specific attempt +tdx wf attempt + +# Show tasks for an attempt +tdx wf attempt tasks + +# Show tasks including subtasks +tdx wf attempt tasks --include-subtasks + +# View task logs +tdx wf attempt logs +task_name +``` + +**Delete workflows:** +```bash +# Delete a workflow project +tdx wf delete my_workflow + +# Delete without confirmation +tdx wf delete my_workflow -y +``` + +**Download workflow project:** +```bash +# Download workflow project from TD +tdx wf download my_workflow + +# Download to specific directory +tdx wf download my_workflow ./local_backup + +# Download specific revision +tdx wf download my_workflow --revision v1.0.0 +``` + +### Database Management + +**Manage databases for workflows:** +```bash +# List all databases +tdx databases + +# Filter databases by pattern +tdx databases "prod_*" + +# List databases on a specific site +tdx databases --site jp01 +``` + +**Note:** Database creation is typically done via TD Console or API. Use `tdx databases` to verify databases exist before workflow execution. + +### Retry and Recovery + +**Retry failed workflows:** +```bash +# Retry an attempt +tdx wf attempt retry + +# Retry from a specific task +tdx wf attempt retry --resume-from +step_name + +# Retry with parameter overrides +tdx wf attempt retry --params '{"key":"value"}' + +# Force retry without confirmation +tdx wf attempt retry --force -y +``` + +**Kill running workflows:** +```bash +# Kill a running attempt +tdx wf attempt kill + +# Kill with reason +tdx wf attempt kill --reason "manual stop" -y +``` + +## Best Practices + +1. **Use descriptive task names** with `+prefix` +2. **Always set timezone** at workflow level +3. **Use session variables** for dynamic dates +4. **Externalize SQL queries** into separate files in `queries/` directory +5. **Add error handlers** with `_error:` for critical tasks +6. **Use retry logic** for potentially flaky operations +7. **Implement parallel execution** where tasks are independent +8. **Store secrets** in TD parameter store, not in workflow files +9. **Add logging** with echo or shell tasks for debugging +10. **Test workflows** by pushing to a development project before production +11. **Use `_export`** for common parameters (especially `td.database`) +12. **Document workflows** with comments +13. **Use `create_table:`** parameter to manage table creation/replacement +14. **Organize queries** in `queries/` directory, scripts in `scripts/` + +## Workflow Development Cycle + +The typical workflow development cycle: + +1. **Create project directory** with workflow file and queries +2. **Push to dev project** - Push with `tdx wf push` to register +3. **Trigger test run** - Use `tdx wf run project.workflow` to immediately run +4. **Monitor execution** - Use returned `attempt_id` with `tdx wf attempt tasks` and `tdx wf attempt logs` +5. **Verify results** - Check created tables with `tdx describe` +6. **Iterate** - Edit workflow and queries, re-push and re-run to test +7. **Monitor scheduled runs** - Check execution with `tdx wf sessions` and `tdx wf attempts` +8. **Promote to production** - Push to production project when ready + +**Example development workflow:** +```bash +# Create project directory +mkdir my_workflow && cd my_workflow + +# Create workflow file +cat > my_workflow.dig << 'EOF' +timezone: UTC +schedule: + daily>: 02:00:00 + +_export: + td: + database: analytics + ++analyze: + td>: queries/analysis.sql + create_table: results +EOF + +# Create queries directory +mkdir queries + +# Create query file +cat > queries/analysis.sql << 'EOF' +SELECT COUNT(*) as cnt +FROM events +WHERE TD_INTERVAL(time, '-1d', 'UTC') +EOF + +# Push workflow to TD (registers and schedules) +tdx wf push . + +# Immediately run the workflow and get attempt_id +tdx wf run my_workflow.my_workflow +# Output: "Started session attempt_id: 12345678" + +# Monitor execution using the attempt_id +tdx wf attempt 12345678 tasks +tdx wf attempt 12345678 logs +analyze + +# Verify table was created after successful completion +tdx describe analytics.results + +# Monitor scheduled runs +tdx wf sessions my_workflow +tdx wf attempts my_workflow +``` + +## Debugging Workflows + +### Testing Workflows + +Test workflow execution: +```bash +# Push workflow to TD for testing +tdx wf push my_workflow + +# Immediately run the workflow +tdx wf run my_workflow.workflow_name +# Output: "Started session attempt_id: 12345678" + +# Monitor task progress using returned attempt_id +tdx wf attempt 12345678 tasks + +# View logs for a specific task +tdx wf attempt 12345678 logs +task_name + +# Check if tables were created after execution +tdx describe database_name.table_name + +# View recent job history in TD Console +# Jobs will appear at: console.treasuredata.com/app/jobs +``` + +### Common Issues + +**"Task failed with exit code 1"** +- Check task logs in TD console or with `tdx wf attempt logs +task_name` +- Verify SQL syntax if using td> operator +- Check file paths for external scripts +- Verify database and table names + +**"Session variable not found"** +- Ensure variable exists in session context +- Check variable syntax: `${variable_name}` +- Verify _export section + +**"Parallel tasks not running in parallel"** +- Ensure `_parallel: true` is set +- Check task dependencies + +**"Query timeout"** +- Increase query timeout in td> operator +- Optimize query performance +- Consider breaking into smaller tasks + +**"Database not found"** +- List available databases with `tdx databases` +- Create database via TD Console if needed +- Verify database name in `_export.td.database` + +### Monitoring Workflows + +**Check workflow status:** +```bash +# List all workflow projects +tdx wf projects + +# List workflow runs (sessions) +tdx wf sessions my_workflow + +# Filter by status +tdx wf sessions my_workflow --status error + +# Show specific attempt details +tdx wf attempt + +# Show tasks for an attempt +tdx wf attempt tasks + +# View task logs +tdx wf attempt logs +task_name + +# Check job status in TD Console +# Visit: console.treasuredata.com/app/jobs +``` + +**View workflow definition:** +```bash +# Show registered workflows in a project +tdx wf workflows my_workflow +``` + +## Advanced Features + +### Event-Triggered Workflows + +Start a workflow automatically when another workflow completes successfully using the `trigger:` directive: + +```yaml +# subsequent_workflow.dig +# This workflow waits for test_workflow_1 to complete successfully + +trigger: + attempt>: + dependent_workflow_name: test_workflow_1 + dependent_project_name: test_project_1 + ++start: + echo>: "This runs after test_workflow_1 succeeds" + ++process: + td>: queries/process_data.sql + create_table: processed_results +``` + +**Key points:** +- Add `trigger:` directive to the **subsequent workflow** (the one that waits) +- `attempt>:` parameter is required (allows future expansion of trigger types) +- `dependent_workflow_name`: Name of the workflow that must complete successfully +- `dependent_project_name`: Name of the project containing the dependent workflow +- Triggers only on **success** (not on failure) +- Works regardless of how the preceding workflow starts (manual, scheduled, or triggered) +- Cannot wait for multiple workflows (use `td_wait`, `s3_wait`, or `http` operators instead) +- SLA directive timing starts only after the preceding workflow finishes + +**Example use case - Activation after segment refresh:** + +```yaml +# activation_workflow.dig +# Triggers after daily segment refresh completes + +timezone: Asia/Tokyo + +trigger: + attempt>: + dependent_workflow_name: segment_refresh + dependent_project_name: customer_segments + ++activate_campaign: + td>: queries/activate_to_destination.sql + ++send_notification: + sh>: python scripts/notify_completion.py +``` + +### Conditional Branching + +```yaml ++check_data: + td>: + query: "SELECT COUNT(*) as cnt FROM source_table WHERE date = '${session_date}'" + store_last_results: true + ++process_if_data_exists: + if>: ${td.last_results.cnt > 0} + _do: + +process: + td>: queries/process_data.sql + _else_do: + +skip: + echo>: "No data for ${session_date}, skipping" +``` + +### Loop Operator + +```yaml ++process_multiple: + for_each>: + region: [US, EU, ASIA] + _do: + +process_region: + td>: queries/process_by_region.sql + create_table: ${region}_data +``` + +### Workflow Call + +Call another workflow: +```yaml ++run_subworkflow: + call>: common/data_validation.dig + params: + table_name: my_table + date: ${session_date} +``` + +## Resources + +- Treasure Workflow Quick Start: https://docs.treasuredata.com/articles/#!pd/treasure-workflow-quick-start-using-td-toolbelt-in-a-cli +- Digdag documentation (underlying engine): https://docs.digdag.io/ +- TD workflow guide: Check internal TD documentation +- Operator reference: https://docs.digdag.io/operators.html +- Session variables: https://docs.digdag.io/workflow_definition.html#session-variables + +## tdx Workflow Command Reference + +| Command | Description | +|---------|-------------| +| `tdx wf projects` | List all workflow projects | +| `tdx wf workflows [project]` | List workflows (optionally for a project) | +| `tdx wf run .` | Immediately run a workflow, returns attempt_id | +| `tdx wf sessions [project]` | List workflow sessions | +| `tdx wf attempts [project]` | List workflow attempts | +| `tdx wf attempt ` | Show attempt details | +| `tdx wf attempt tasks` | Show tasks for an attempt | +| `tdx wf attempt logs [+task]` | View task logs (interactive selector if no task specified) | +| `tdx wf attempt kill` | Kill a running attempt | +| `tdx wf attempt retry` | Retry an attempt | +| `tdx wf download ` | Download workflow project | +| `tdx wf push ` | Push workflow to TD | +| `tdx wf delete ` | Delete workflow project | diff --git a/skills/workflow-management/SKILL.md b/skills/workflow-management/SKILL.md new file mode 100644 index 0000000..51f4495 --- /dev/null +++ b/skills/workflow-management/SKILL.md @@ -0,0 +1,526 @@ +--- +name: workflow-management +description: Expert assistance for managing, debugging, monitoring, and optimizing Treasure Data workflows. Use this skill when users need help troubleshooting workflow failures, improving performance, or implementing workflow best practices. +--- + +# Treasure Workflow Management Expert + +Expert assistance for managing and optimizing Treasure Workflow (Treasure Data's workflow orchestration tool). + +## When to Use This Skill + +Use this skill when: +- Debugging workflow failures or errors +- Optimizing workflow performance +- Monitoring workflow execution +- Implementing workflow alerting and notifications +- Managing workflow dependencies +- Troubleshooting scheduling issues +- Performing workflow maintenance and updates + +## Core Management Tasks + +### 1. Workflow Monitoring + +**Check workflow status:** +```bash +# List all workflow projects +tdx wf projects + +# Show workflows in a specific project +tdx wf workflows + +# Immediately run a workflow and get attempt_id for monitoring +tdx wf run . +# Output: "Started session attempt_id: 12345678" + +# Use returned attempt_id to monitor task status +tdx wf attempt 12345678 tasks + +# View logs for specific tasks +tdx wf attempt 12345678 logs +task_name + +# List recent runs (sessions) +tdx wf sessions + +# Filter sessions by status +tdx wf sessions --status error +tdx wf sessions --status running + +# View specific attempt details +tdx wf attempt +``` + +### 2. Debugging Failed Workflows + +**Investigate failure:** +```bash +# Get attempt details +tdx wf attempt + +# Show tasks for an attempt +tdx wf attempt tasks + +# View task logs +tdx wf attempt logs +task_name + +# Include subtasks in task list +tdx wf attempt tasks --include-subtasks +``` + +**Common debugging steps:** + +1. **Check error message** in logs +2. **Verify query syntax** if td> operator failed +3. **Check time ranges** - ensure data exists for session date +4. **Validate dependencies** - check if upstream tasks completed +5. **Review parameter values** - verify session variables are correct +6. **Check resource limits** - query memory, timeout issues + +### 3. Query Performance Issues + +**Identify slow queries:** +```yaml ++monitor_query: + td>: queries/analysis.sql + # Add job monitoring + store_last_results: true + ++check_performance: + py>: scripts.check_query_performance.main + job_id: ${td.last_job_id} +``` + +**Optimization checklist:** +- Add time filters (TD_TIME_RANGE) +- Use approximate aggregations (APPROX_DISTINCT) +- Reduce JOIN complexity +- Select only needed columns +- Add query hints for large joins +- Consider breaking into smaller tasks +- Use appropriate engine (Presto vs Hive) + +### 4. Workflow Alerting + +**Slack notification on failure:** +```yaml ++critical_task: + td>: queries/important_analysis.sql + + _error: + +send_slack_alert: + sh>: | + curl -X POST ${secret:slack.webhook_url} \ + -H 'Content-Type: application/json' \ + -d '{ + "text": "Workflow failed: '"${workflow_name}"'", + "attachments": [{ + "color": "danger", + "fields": [ + {"title": "Session", "value": "'"${session_id}"'", "short": true}, + {"title": "Date", "value": "'"${session_date}"'", "short": true} + ] + }] + }' +``` + +**Email notification:** +```yaml ++notify_completion: + py>: scripts.notifications.send_email + recipients: ["team@example.com"] + subject: "Workflow ${workflow_name} completed" + body: "Session ${session_id} completed successfully" + + _error: + +notify_failure: + py>: scripts.notifications.send_email + recipients: ["oncall@example.com"] + subject: "ALERT: Workflow ${workflow_name} failed" + body: "Session ${session_id} failed. Check logs immediately." +``` + +### 5. Data Quality Checks + +**Implement validation tasks:** +```yaml ++main_processing: + td>: queries/process_data.sql + create_table: processed_data + ++validate_results: + td>: + query: | + SELECT + COUNT(*) as total_rows, + COUNT(DISTINCT user_id) as unique_users, + SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) as null_users + FROM processed_data + store_last_results: true + ++check_quality: + py>: scripts.data_quality.validate + total_rows: ${td.last_results.total_rows} + null_users: ${td.last_results.null_users} + # Script should fail if quality checks don't pass +``` + +Python validation script: +```python +def validate(total_rows, null_users): + """Validate data quality""" + if total_rows == 0: + raise Exception("No data processed") + + if null_users > total_rows * 0.01: # More than 1% nulls + raise Exception(f"Too many null users: {null_users}") + + return {"status": "passed"} +``` + +### 6. Dependency Management + +**Workflow dependencies:** +```yaml +# workflows/upstream.dig ++produce_data: + td>: queries/create_source_data.sql + create_table: source_data_${session_date_compact} +``` + +```yaml +# workflows/downstream.dig +schedule: + daily>: 04:00:00 # Runs after upstream (3:00) + +_export: + requires: + - upstream_workflow # Wait for upstream completion + ++consume_data: + td>: + query: | + SELECT * FROM source_data_${session_date_compact} + create_table: processed_data +``` + +**Manual dependency with polling:** +```yaml ++wait_for_upstream: + sh>: | + for i in {1..60}; do + if tdx describe production_db.source_data_${session_date_compact}; then + exit 0 + fi + sleep 60 + done + exit 1 + retry: 3 + ++process_dependent_data: + td>: queries/dependent_processing.sql +``` + +### 7. Backfill Operations + +**Backfill for date range:** + +Use the `tdx wf attempt retry` command to re-run workflows for specific attempts, or use the TD Console to trigger manual runs with custom parameters. + +```bash +# Retry an attempt +tdx wf attempt retry + +# Retry from a specific task +tdx wf attempt retry --resume-from +step_name + +# Retry with parameter overrides +tdx wf attempt retry --params '{"session_date":"2024-01-15"}' +``` + +**Backfill workflow pattern:** +```yaml +# backfill.dig ++backfill: + loop>: + dates: + - 2024-01-01 + - 2024-01-02 + - 2024-01-03 + # ... more dates + _do: + +process_date: + call>: main_workflow.dig + params: + session_date: ${dates} +``` + +### 8. Workflow Versioning + +**Best practices for updates:** + +1. **Test in development environment first** +2. **Use version comments:** +```yaml +# Version: 2.1.0 +# Changes: Added data quality validation +# Date: 2024-01-15 + +timezone: Asia/Tokyo +``` + +3. **Keep backup of working version:** +```bash +# Download current version from TD before making changes +tdx wf download my_workflow ./backup + +# Or create local backup +cp workflow.dig workflow.dig.backup.$(date +%Y%m%d) +``` + +4. **Gradual rollout for critical workflows:** +```yaml +# Run new version in parallel with old version ++new_version: + td>: queries/new_processing.sql + create_table: results_v2 + ++old_version: + td>: queries/old_processing.sql + create_table: results_v1 + ++compare_results: + td>: + query: | + SELECT + (SELECT COUNT(*) FROM results_v1) as v1_count, + (SELECT COUNT(*) FROM results_v2) as v2_count + store_last_results: true +``` + +### 9. Resource Optimization + +**Query resource management:** +```yaml ++large_query: + td>: queries/heavy_processing.sql + # Set query priority (lower = higher priority) + priority: 0 + + # Set result output size + result_connection: ${td.database}:result_table + + # Engine settings + engine: presto + engine_version: stable +``` + +**Parallel task optimization:** +```yaml +# Limit parallelism to avoid resource exhaustion ++process_many: + for_each>: + batch: ["batch_1", "batch_2", "batch_3", "batch_4", "batch_5"] + _parallel: + limit: 2 # Only run 2 tasks in parallel + _do: + +process_batch: + td>: queries/process_batch.sql + create_table: ${batch}_results +``` + +### 10. Monitoring and Metrics + +**Collect workflow metrics:** +```yaml ++workflow_start: + py>: scripts.metrics.record_start + workflow: ${workflow_name} + session: ${session_id} + ++main_work: + td>: queries/main_query.sql + ++workflow_end: + py>: scripts.metrics.record_completion + workflow: ${workflow_name} + session: ${session_id} + duration: ${session_duration} + + _error: + +record_failure: + py>: scripts.metrics.record_failure + workflow: ${workflow_name} + session: ${session_id} +``` + +**Metrics tracking script:** +```python +import pytd +from datetime import datetime + +def record_start(workflow, session): + client = pytd.Client(database='monitoring') + client.query(f""" + INSERT INTO workflow_metrics + VALUES ( + '{workflow}', + '{session}', + {int(datetime.now().timestamp())}, + NULL, + 'running' + ) + """) + +def record_completion(workflow, session, duration): + client = pytd.Client(database='monitoring') + client.query(f""" + UPDATE workflow_metrics + SET end_time = {int(datetime.now().timestamp())}, + status = 'completed' + WHERE workflow = '{workflow}' + AND session_id = '{session}' + """) +``` + +## Common Issues and Solutions + +### Issue: Workflow Runs Too Long + +**Solutions:** +1. Break into smaller parallel tasks +2. Optimize queries (add time filters, use APPROX functions) +3. Use incremental processing instead of full refresh +4. Consider Presto instead of Hive for faster execution +5. Add indexes if querying external databases + +### Issue: Frequent Timeouts + +**Solutions:** +```yaml ++long_running_query: + td>: queries/complex_analysis.sql + timeout: 3600s # Increase timeout to 1 hour + retry: 2 + retry_wait: 300s +``` + +### Issue: Intermittent Failures + +**Solutions:** +```yaml ++flaky_task: + td>: queries/external_api_call.sql + retry: 5 + retry_wait: 60s + retry_wait_multiplier: 2.0 # Exponential backoff +``` + +### Issue: Data Not Available + +**Solutions:** +```yaml ++wait_for_data: + sh>: | + # Wait up to 30 minutes for data + for i in {1..30}; do + COUNT=$(tdx query -d analytics "SELECT COUNT(*) FROM source WHERE date='${session_date}'" --format csv | tail -1) + if [ "$COUNT" -gt 0 ]; then + exit 0 + fi + sleep 60 + done + exit 1 + ++process_data: + td>: queries/process.sql +``` + +### Issue: Out of Memory + +**Solutions:** +1. Reduce query complexity +2. Add better filters to reduce data volume +3. Use sampling for analysis +4. Split into multiple smaller queries +5. Increase query resources (contact TD admin) + +### Issue: Duplicate Runs + +**Solutions:** +```yaml +# Use idempotent operations ++safe_insert: + td>: + query: | + DELETE FROM target_table + WHERE date = '${session_date}'; + + INSERT INTO target_table + SELECT * FROM source_table + WHERE date = '${session_date}' +``` + +## Best Practices + +1. **Implement comprehensive error handling** for all critical tasks +2. **Add logging** at key workflow stages +3. **Monitor query performance** regularly +4. **Set up alerts** for failures and SLA violations +5. **Use idempotent operations** to handle reruns safely +6. **Document workflow dependencies** clearly +7. **Implement data quality checks** after processing +8. **Keep workflows modular** for easier maintenance +9. **Version control workflows** in git +10. **Test changes** in dev environment first +11. **Monitor resource usage** and optimize +12. **Set appropriate timeouts** and retries +13. **Use meaningful task names** for debugging +14. **Archive old workflow versions** for rollback capability + +## Maintenance Checklist + +Weekly: +- Review failed workflow sessions +- Check query performance trends +- Monitor resource utilization +- Review alert patterns + +Monthly: +- Clean up old temporary tables +- Review and optimize slow workflows +- Update documentation +- Review and update dependencies +- Check for deprecated features + +Quarterly: +- Performance audit of all workflows +- Review workflow architecture +- Update error handling patterns +- Security review (secrets, access) + +## Resources + +- TD Console: Access workflow logs and monitoring +- Treasure Workflow Quick Start: https://docs.treasuredata.com/articles/#!pd/treasure-workflow-quick-start-using-td-toolbelt-in-a-cli +- tdx CLI: Command-line workflow management using `tdx wf` commands +- Query performance: Use EXPLAIN for query optimization +- Internal docs: Check TD internal documentation for updates + +## tdx Workflow Command Reference + +| Command | Description | +|---------|-------------| +| `tdx wf projects` | List all workflow projects | +| `tdx wf workflows [project]` | List workflows (optionally for a project) | +| `tdx wf run .` | Immediately run a workflow, returns attempt_id | +| `tdx wf sessions [project]` | List workflow sessions | +| `tdx wf attempts [project]` | List workflow attempts | +| `tdx wf attempt ` | Show attempt details | +| `tdx wf attempt tasks` | Show tasks for an attempt | +| `tdx wf attempt logs [+task]` | View task logs (interactive selector if no task specified) | +| `tdx wf attempt kill` | Kill a running attempt | +| `tdx wf attempt retry` | Retry an attempt | +| `tdx wf download ` | Download workflow project | +| `tdx wf push ` | Push workflow to TD | +| `tdx wf delete ` | Delete workflow project |