Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 09:03:06 +08:00
commit 73652b90f9
8 changed files with 2794 additions and 0 deletions

817
skills/dbt/SKILL.md Normal file
View File

@@ -0,0 +1,817 @@
---
name: dbt
description: Expert guidance for using dbt (data build tool) with Treasure Data Trino. Use this skill when users need help setting up dbt with TD, creating models, using TD-specific macros, handling incremental models, or troubleshooting dbt-trino adapter issues.
---
# dbt with Treasure Data Trino
Expert assistance for using dbt (data build tool) with Treasure Data's Trino engine.
## When to Use This Skill
Use this skill when:
- Setting up dbt with Treasure Data Trino
- Creating dbt models for TD
- Writing TD-specific dbt macros
- Implementing incremental models with TD_INTERVAL
- Troubleshooting dbt-trino adapter errors
- Overriding dbt-trino macros for TD compatibility
- Managing dbt projects with TD data pipelines
## Prerequisites
### Installation
**Recommended: Using uv (modern Python package manager):**
`uv` is a fast, modern Python package and environment manager written in Rust. It's significantly faster than traditional pip and provides better dependency resolution.
```bash
# Install uv (choose one):
# Option 1: Homebrew (recommended for Mac)
brew install uv
# Option 2: Standalone installer
curl -LsSf https://astral.sh/uv/install.sh | sh
# Create and activate virtual environment with uv
uv venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install dbt-core and dbt-trino (much faster than pip)
uv pip install dbt-core dbt-trino==1.9.3
# Verify installation
dbt --version
```
**Benefits of uv:**
- **10-100x faster** than pip for package installation
- **Better dependency resolution** with clearer error messages
- **Drop-in replacement** for pip (use `uv pip` instead of `pip`)
- **Built-in virtual environment management** with `uv venv`
**Alternative: Using traditional pip and venv:**
```bash
# Create virtual environment (recommended)
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Note: brew install dbt doesn't work well on Mac OS X
# Install dbt-core and dbt-trino
pip install dbt-core dbt-trino==1.9.3
# Verify installation
dbt --version
# Expected output:
# Core: 1.10.9
# Plugins: trino: 1.9.3
```
### TD Connection Setup
Create `profiles.yml` (can be in `~/.dbt/profiles.yml` or at project root):
```yaml
td:
target: dev
outputs:
dev:
type: trino
method: none # Use 'none' for API key authentication
user: "{{ env_var('TD_API_KEY') }}" # TD API key from environment variable
password: dummy # Password is not used with API key
host: api-presto.treasuredata.com
port: 443
database: td # Always 'td' for Treasure Data
schema: your_dev_database # Your dev TD database (e.g., 'dev_analytics')
threads: 4
http_scheme: https
session_properties:
query_max_run_time: 1h
prod:
type: trino
method: none
user: "{{ env_var('TD_API_KEY') }}"
password: dummy
host: api-presto.treasuredata.com
port: 443
database: td
schema: your_prod_database # Your prod TD database (e.g., 'production')
threads: 4
http_scheme: https
session_properties:
query_max_run_time: 1h
```
**Important TD-specific settings:**
- `method`: Set to `none` for API key authentication (not `ldap`)
- `user`: Use TD API key from `TD_API_KEY` environment variable
- `password`: Set to `dummy` (not used with API key authentication)
- `host`: Always `api-presto.treasuredata.com` (even though it's actually Trino)
- `database`: Always set to `td` for Treasure Data
- `schema`: Set to your actual TD database name (what you see in TD Console)
**Set up your TD API key:**
```bash
# Get your API key from TD Console: https://console.treasuredata.com/app/users
export TD_API_KEY="your_api_key_here"
# Or add to your shell profile (~/.bashrc, ~/.zshrc, etc.)
echo 'export TD_API_KEY="your_api_key_here"' >> ~/.zshrc
```
**Switch between dev and prod:**
```bash
# Run against dev (default)
dbt run
# Run against prod
dbt run --target prod
```
### dbt Project Configuration
Create or update `dbt_project.yml` with TD-specific settings:
```yaml
name: 'my_td_project'
version: '1.0.0'
config-version: 2
# This setting configures which "profile" dbt uses for this project.
profile: 'td'
# These configurations specify where dbt should look for different types of files.
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
# SSL certificate validation (required for TD)
flags:
require_certificate_validation: true
# Global variable for default time range
vars:
target_range: '-3M/now' # Default: last 3 months to now
# Model configuration with TD-specific settings
models:
my_td_project:
+materialized: table
+on_schema_change: "append_new_columns" # Auto-add new columns instead of failing
+views_enabled: false # TD doesn't support views (use tables)
# Staging models
staging:
+materialized: table
+tags: ["staging"]
# Marts models
marts:
+materialized: table
+tags: ["marts"]
# Incremental models
incremental:
+materialized: incremental
+on_schema_change: "append_new_columns"
+tags: ["incremental"]
```
**Key TD-specific settings:**
- `flags.require_certificate_validation: true` - Required for SSL validation with TD
- `vars.target_range: '-3M/now'` - Default time range for all models using the variable
- `+on_schema_change: "append_new_columns"` - Automatically add new columns to existing tables (prevents rebuild on schema changes)
- `+views_enabled: false` - Explicitly disable views since TD doesn't support `CREATE VIEW`
**Benefits:**
- **SSL security**: Ensures certificate validation for secure TD connections
- **Schema evolution**: New columns are added automatically without dropping tables
- **Default time window**: All models using `{{ var('target_range') }}` get sensible default
- **No views**: Prevents accidental view creation attempts
## Required TD-Specific Overrides
TD's Presto/Trino has limitations that require overriding some dbt-trino macros. You MUST create this file in your dbt project.
### Create `macros/override_dbt_trino.sql`
This file overrides dbt-trino macros to work with TD Presto/Trino limitations:
**Key changes:**
1. Removes table ownership queries (TD doesn't support)
2. Simplifies catalog queries
3. Replaces `CREATE VIEW` with `CREATE TABLE` (TD doesn't support views)
See the full macro file in [macros/override_dbt_trino.sql](./macros/override_dbt_trino.sql) in this skill directory.
**Why this is needed:**
- TD Presto doesn't support `CREATE VIEW` statements
- TD doesn't expose table ownership information
- Some information_schema queries need simplification
## TD-Specific dbt Macros
### 1. Incremental Scan Macro
For incremental models that process new data only:
```sql
-- macros/td_incremental_scan.sql
{% macro incremental_scan(table_name) -%}
(
SELECT * FROM {{ table_name }}
WHERE TD_INTERVAL(time, '{{ var("target_range", "-3M/now") }}')
{% if is_incremental() -%}
AND time > {{ get_max_time(this.table) }}
{%- endif %}
)
{%- endmacro %}
{% macro get_max_time(table_name) -%}
(SELECT MAX(time) FROM {{ table_name }})
{%- endmacro %}
```
**Default behavior:** Scans last 3 months to now (`-3M/now`) if no `target_range` variable is provided.
**Usage in model:**
```sql
-- models/incremental_events.sql
{{
config(
materialized='incremental',
unique_key='event_id'
)
}}
SELECT
event_id,
user_id,
event_type,
time
FROM {{ incremental_scan('raw_events') }}
```
**Run with default (last 3 months):**
```bash
dbt run --select incremental_events
```
**Or override with specific range:**
```bash
# Yesterday only
dbt run --vars '{"target_range": "-1d"}' --select incremental_events
# Last 7 days
dbt run --vars '{"target_range": "-7d/now"}' --select incremental_events
# Specific date range
dbt run --vars '{"target_range": "2024-01-01/2024-01-31"}' --select incremental_events
```
**Note:** No need to create wrapper macros for TD time functions - they're already simple enough to use directly in your SQL.
## dbt Model Patterns for TD
### Basic Model
```sql
-- models/daily_events.sql
{{
config(
materialized='table'
)
}}
SELECT
TD_TIME_STRING(time, 'd!', 'JST') as date,
event_type,
COUNT(*) as event_count,
approx_distinct(user_id) as unique_users
FROM {{ source('raw', 'events') }}
WHERE TD_INTERVAL(time, '-30d', 'JST')
GROUP BY 1, 2
```
### Incremental Model
```sql
-- models/incremental_user_events.sql
{{
config(
materialized='incremental',
unique_key='user_date_key'
)
}}
SELECT
CONCAT(CAST(user_id AS VARCHAR), '_', TD_TIME_STRING(time, 'd!', 'JST')) as user_date_key,
user_id,
TD_TIME_STRING(time, 'd!', 'JST') as date,
COUNT(*) as event_count
FROM {{ source('raw', 'events') }}
WHERE TD_INTERVAL(time, '{{ var('target_range', '-1d') }}', 'JST')
{% if is_incremental() %}
-- Only process data after last run
AND time > (SELECT MAX(time) FROM {{ this }})
{% endif %}
GROUP BY 1, 2, 3
```
### CTE (Common Table Expression) Pattern
```sql
-- models/user_metrics.sql
{{
config(
materialized='table'
)
}}
WITH events_filtered AS (
SELECT *
FROM {{ source('raw', 'events') }}
WHERE TD_INTERVAL(time, '-7d', 'JST')
),
user_sessions AS (
SELECT
user_id,
TD_SESSIONIZE(time, 1800, user_id) as session_id,
MIN(time) as session_start,
MAX(time) as session_end
FROM events_filtered
GROUP BY user_id, session_id
)
SELECT
user_id,
COUNT(DISTINCT session_id) as session_count,
AVG(session_end - session_start) as avg_session_duration
FROM user_sessions
GROUP BY user_id
```
## Sources Configuration
Define TD tables as sources:
```yaml
# models/sources.yml
version: 2
sources:
- name: raw
database: production
schema: default
tables:
- name: events
description: Raw event data from applications
columns:
- name: time
description: Event timestamp (Unix time)
- name: user_id
description: User identifier
- name: event_type
description: Type of event
- name: users
description: User profile data
```
**Usage in models:**
```sql
SELECT * FROM {{ source('raw', 'events') }}
```
## Testing with TD
### Schema Tests
```yaml
# models/schema.yml
version: 2
models:
- name: daily_events
description: Daily event aggregations
columns:
- name: date
description: Event date
tests:
- not_null
- unique
- name: event_count
description: Number of events
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: unique_users
description: Unique user count (approximate)
tests:
- not_null
```
### Custom TD Tests
```sql
-- tests/assert_positive_events.sql
-- Returns records that fail the test
SELECT *
FROM {{ ref('daily_events') }}
WHERE event_count < 0
```
## Running dbt with TD
### Basic Commands
```bash
# Test connection
dbt debug
# Run all models
dbt run
# Run specific model
dbt run --select daily_events
# Run with variables
dbt run --vars '{"target_range": "-7d"}'
# Run tests
dbt test
# Generate documentation
dbt docs generate
dbt docs serve
```
### Incremental Run Pattern
```bash
# Daily incremental run
dbt run --select incremental_events --vars '{"target_range": "-1d"}'
# Full refresh
dbt run --select incremental_events --full-refresh
# Backfill specific date
dbt run --select incremental_events --vars '{"target_range": "2024-01-15"}'
```
## Common Issues and Solutions
### Issue 1: "This connector does not support creating views"
**Error:**
```
TrinoUserError: This connector does not support creating views
```
**Solution:**
Add `macros/override_dbt_trino.sql` that overrides `trino__create_view_as` to use `CREATE TABLE` instead.
### Issue 2: Catalog Query Failures
**Error:**
```
Database Error: Table ownership information not available
```
**Solution:**
Use the override macros that remove table ownership queries from catalog operations.
### Issue 3: Connection Timeout
**Error:**
```
Connection timeout
```
**Solution:**
Increase session timeout in `profiles.yml` if needed (default is 1h):
```yaml
session_properties:
query_max_run_time: 2h # Increase if queries legitimately need more time
```
### Issue 4: Incremental Model Not Working
**Problem:**
Incremental model processes all data every time.
**Solution:**
Ensure unique_key is set and check incremental logic:
```sql
{{
config(
materialized='incremental',
unique_key='event_id' -- Must be specified
)
}}
{% if is_incremental() %}
-- This block only runs on incremental runs
WHERE time > (SELECT MAX(time) FROM {{ this }})
{% endif %}
```
### Issue 5: Variable Not Found
**Error:**
```
Compilation Error: Var 'target_range' is undefined
```
**Solution:**
Provide default value:
```sql
WHERE TD_INTERVAL(time, '{{ var('target_range', '-1d') }}', 'JST')
```
Or pass variable:
```bash
dbt run --vars '{"target_range": "-1d"}'
```
## Project Structure
```
dbt_project/
├── dbt_project.yml # Project config with TD-specific settings
├── profiles.yml # Connection config (or in ~/.dbt/profiles.yml)
├── macros/
│ ├── override_dbt_trino.sql # Required TD overrides
│ └── td_incremental_scan.sql # Optional: Incremental helper
├── models/
│ ├── sources.yml # Source definitions
│ ├── schema.yml # Tests and documentation
│ ├── staging/
│ │ └── stg_events.sql
│ └── marts/
│ ├── daily_events.sql
│ └── user_metrics.sql
└── tests/
└── assert_positive_events.sql
```
**Note:** `profiles.yml` can be placed either:
- At project root (recommended for TD Workflow deployments)
- In `~/.dbt/profiles.yml` (for local development)
## Best Practices
1. **Include time filters in all models**
- Use TD_INTERVAL or TD_TIME_RANGE directly
- Critical for performance on large tables
2. **Use incremental models wisely**
- Good for append-only event data
- Requires careful unique_key selection
- Test thoroughly before production
3. **Leverage sources**
- Define all TD tables as sources
- Enables lineage tracking
- Centralizes table documentation
4. **Use variables for flexibility**
- Date ranges
- Environment-specific settings
- Makes models reusable
5. **Test your models**
- Not null checks on key columns
- Unique checks on IDs
- Custom assertions for business logic
6. **Document everything**
- Model descriptions
- Column descriptions
- Include TD-specific notes
## Integration with TD Workflows
### Running dbt with Custom Scripts (Recommended for TD Workflow)
TD Workflow supports running dbt using Custom Scripts with Docker containers. This is the recommended approach for production deployments.
**Create a Python wrapper (`dbt_wrapper.py`):**
```python
#!/usr/bin/env python3
import sys
from dbt.cli.main import dbtRunner
def run_dbt(command_args):
"""Run dbt commands using dbtRunner"""
dbt = dbtRunner()
result = dbt.invoke(command_args)
if not result.success:
sys.exit(1)
return result
if __name__ == "__main__":
# Get command from arguments (e.g., ['run', '--target', 'prod'])
command = sys.argv[1:] if len(sys.argv) > 1 else ['run']
print(f"Running dbt with command: {' '.join(command)}")
run_dbt(command)
```
**Create workflow file (`dbt_workflow.dig`):**
```yaml
timezone: Asia/Tokyo
schedule:
daily>: 03:00:00
_export:
docker:
image: "treasuredata/customscript-python:3.12.11-td1"
# Set TD API key from secrets
_env:
TD_API_KEY: ${secret:td.apikey}
+setup:
py>: tasks.InstallPackages
+dbt_run:
py>: dbt_wrapper.run_dbt
command_args: ['run', '--target', 'prod']
+dbt_test:
py>: dbt_wrapper.run_dbt
command_args: ['test']
```
**Create package installer (`tasks.py`):**
```python
def InstallPackages():
"""Install dbt and dependencies at runtime"""
import subprocess
import sys
packages = [
'dbt-core==1.10.9',
'dbt-trino==1.9.3'
]
for package in packages:
subprocess.check_call([
sys.executable, '-m', 'pip', 'install', package
])
```
**Deploy to TD Workflow:**
```bash
# 1. Clean dbt artifacts
dbt clean
# 2. Push to TD Workflow
td workflow push my_dbt_project
# 3. Set TD API key secret
td workflow secrets --project my_dbt_project --set td.apikey=YOUR_API_KEY
# 4. Run from TD Console or trigger manually
td workflow start my_dbt_project dbt_workflow --session now
```
**Important notes:**
- Use Docker image: `treasuredata/customscript-python:3.12.11-td1` (latest stable image with Python 3.12)
- Install dependencies at runtime using `py>: tasks.InstallPackages`
- Store API key in TD secrets: `${secret:td.apikey}`
- Include your dbt project files (models, macros, profiles.yml, dbt_project.yml)
### Local Digdag + dbt Integration (Development)
For local development and testing:
```yaml
# workflow.dig
+dbt_run:
sh>: dbt run --vars '{"target_range": "${session_date}"}'
+dbt_test:
sh>: dbt test
```
### Scheduled dbt Runs
```yaml
# daily_dbt_workflow.dig
timezone: Asia/Tokyo
schedule:
daily>: 03:00:00
_export:
session_date: ${session_date}
+run_incremental_models:
sh>: |
cd /path/to/dbt_project
dbt run --select tag:incremental --vars '{"target_range": "-1d"}'
+run_tests:
sh>: |
cd /path/to/dbt_project
dbt test --select tag:incremental
+notify_completion:
echo>: "dbt run completed for ${session_date}"
```
## Advanced Patterns
### Dynamic Table Selection
```sql
-- models/flexible_aggregation.sql
{{
config(
materialized='table'
)
}}
{% set table_name = var('source_table', 'events') %}
{% set metric = var('metric', 'event_count') %}
SELECT
TD_TIME_STRING(time, 'd!', 'JST') as date,
COUNT(*) as {{ metric }}
FROM {{ source('raw', table_name) }}
WHERE TD_INTERVAL(time, '{{ var('target_range', '-7d') }}', 'JST')
GROUP BY 1
```
### Multi-Source Union
```sql
-- models/unified_events.sql
{{
config(
materialized='table'
)
}}
{% set sources = ['mobile_events', 'web_events', 'api_events'] %}
{% for source in sources %}
SELECT
'{{ source }}' as source_type,
*
FROM {{ source('raw', source) }}
WHERE TD_INTERVAL(time, '-1d', 'JST')
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}
```
## Resources
- dbt Documentation: https://docs.getdbt.com/
- dbt-trino adapter: https://github.com/starburstdata/dbt-trino
- TD Query Engine: Use Trino-specific SQL
- TD Functions: TD_INTERVAL, TD_TIME_STRING, etc.
## Migration from SQL Scripts to dbt
If migrating existing TD SQL workflows to dbt:
1. **Convert queries to models**
- Add config block
- Use source() for table references
- Add TD-specific macros
2. **Add tests**
- Start with basic not_null tests
- Add unique key tests
- Create custom business logic tests
3. **Implement incrementally**
- Start with simple table materializations
- Add incremental models gradually
- Test each model thoroughly
4. **Update orchestration**
- Replace direct SQL in digdag with dbt commands
- Maintain existing schedules
- Add dbt test steps

View File

@@ -0,0 +1,163 @@
-- Import from dbt-trino v1.7.1
-- https://github.com/starburstdata/dbt-trino/blob/v1.7.1/dbt/include/trino/macros/catalog.sql
-- To remove unnecessary parts that cause errors with TD Presto
-- https://github.com/starburstdata/dbt-trino/blob/1.4.latest/dbt/include/trino/macros/catalog.sql#L40-L59
-- https://github.com/starburstdata/dbt-trino/issues/298
{% macro trino__get_catalog(information_schema, schemas) -%}
{% set query %}
with tables as (
{{ trino__get_catalog_tables_sql(information_schema) }}
{{ trino__get_catalog_schemas_where_clause_sql(schemas) }}
),
columns as (
{{ trino__get_catalog_columns_sql(information_schema) }}
{{ trino__get_catalog_schemas_where_clause_sql(schemas) }}
)
{{ trino__get_catalog_results_sql() }}
{%- endset -%}
{{ return(run_query(query)) }}
{%- endmacro %}
{% macro trino__get_catalog_relations(information_schema, relations) -%}
{% set query %}
with tables as (
{{ trino__get_catalog_tables_sql(information_schema) }}
{{ trino__get_catalog_relations_where_clause_sql(relations) }}
),
columns as (
{{ trino__get_catalog_columns_sql(information_schema) }}
{{ trino__get_catalog_relations_where_clause_sql(relations) }}
)
{{ trino__get_catalog_results_sql() }}
{%- endset -%}
{{ return(run_query(query)) }}
{%- endmacro %}
{% macro trino__get_catalog_tables_sql(information_schema) -%}
select
table_catalog as "table_database",
table_schema as "table_schema",
table_name as "table_name",
table_type as "table_type",
null as "table_owner"
from {{ information_schema }}.tables
{%- endmacro %}
{% macro trino__get_catalog_columns_sql(information_schema) -%}
select
table_catalog as "table_database",
table_schema as "table_schema",
table_name as "table_name",
column_name as "column_name",
ordinal_position as "column_index",
data_type as "column_type",
comment as "column_comment"
from {{ information_schema }}.columns
{%- endmacro %}
{% macro trino__get_catalog_results_sql() -%}
select
table_database,
table_schema,
table_name,
table_type,
table_owner,
column_name,
column_index,
column_type,
column_comment
from tables
join columns using ("table_database", "table_schema", "table_name")
order by "column_index"
{%- endmacro %}
{% macro trino__get_catalog_schemas_where_clause_sql(schemas) -%}
where
table_schema != 'information_schema'
and
table_schema in ('{{ schemas | join("','") | lower }}')
{%- endmacro %}
{% macro trino__get_catalog_relations_where_clause_sql(relations) -%}
where
table_schema != 'information_schema'
and
(
{%- for relation in relations -%}
{% if relation.schema and relation.identifier %}
(
table_schema = '{{ relation.schema | lower }}'
and table_name = '{{ relation.identifier | lower }}'
)
{% elif relation.schema %}
(
table_schema = '{{ relation.schema | lower }}'
)
{% else %}
{% do exceptions.raise_compiler_error(
'`get_catalog_relations` requires a list of relations, each with a schema'
) %}
{% endif %}
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- endmacro %}
-- - get_catalog
-- - list_relations_without_caching
-- - get_columns_in_relation
-- Import from dbt-trino v1.1
-- https://github.com/starburstdata/dbt-trino/blob/1.1.latest/dbt/include/trino/macros/adapters.sql
-- To remove unnecessary parts that cause errors with TD Presto
-- https://github.com/starburstdata/dbt-trino/blob/1.4.latest/dbt/include/trino/macros/adapters.sql#L29-L48
-- https://github.com/starburstdata/dbt-trino/issues/298
{% macro trino__list_relations_without_caching(relation) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
select
table_catalog as database,
table_name as name,
table_schema as schema,
case when table_type = 'BASE TABLE' then 'table'
when table_type = 'VIEW' then 'view'
else table_type
end as table_type
from {{ relation.information_schema() }}.tables
where table_schema = '{{ relation.schema | lower }}'
{% endcall %}
{{ return(load_result('list_relations_without_caching').table) }}
{% endmacro %}
-- Override dbt-trino "trino__create_view_as" macro with "create table if not exists"
-- https://github.com/starburstdata/dbt-trino/blob/1.4.latest/dbt/include/trino/macros/adapters.sql#L102-L115
-- To void unsupported "create view" action with TD Presto
-- Database Error in model dbt_results (models/dbt_results.sql)
-- TrinoUserError(type=USER_ERROR, name=NOT_SUPPORTED, message="This connector does not support creating views")
{% macro trino__create_view_as(relation, sql) -%}
{%- set view_security = config.get('view_security', 'definer') -%}
{%- if view_security not in ['definer', 'invoker'] -%}
{%- set log_message = 'Invalid value for view_security (%s) specified. Setting default value (%s).' % (view_security, 'definer') -%}
{% do log(log_message) %}
{%- set on_table_exists = 'definer' -%}
{% endif %}
create table if not exists
{{ relation }}
as
{{ sql }}
;
{% endmacro %}

View File

@@ -0,0 +1,17 @@
-- TD Incremental Scan Macro
-- Scans a table for a specific time range and optionally filters for incremental processing
-- Default target_range is '-3M/now' (last 3 months to now)
{% macro incremental_scan(table_name) -%}
(
SELECT * FROM {{ table_name }}
WHERE TD_INTERVAL(time, '{{ var("target_range", "-3M/now") }}')
{% if is_incremental() -%}
AND time > {{ get_max_time(this.table) }}
{%- endif %}
)
{%- endmacro %}
{% macro get_max_time(table_name) -%}
(SELECT MAX(time) FROM {{ table_name }})
{%- endmacro %}

1194
skills/digdag/SKILL.md Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,526 @@
---
name: workflow-management
description: Expert assistance for managing, debugging, monitoring, and optimizing Treasure Data workflows. Use this skill when users need help troubleshooting workflow failures, improving performance, or implementing workflow best practices.
---
# Treasure Workflow Management Expert
Expert assistance for managing and optimizing Treasure Workflow (Treasure Data's workflow orchestration tool).
## When to Use This Skill
Use this skill when:
- Debugging workflow failures or errors
- Optimizing workflow performance
- Monitoring workflow execution
- Implementing workflow alerting and notifications
- Managing workflow dependencies
- Troubleshooting scheduling issues
- Performing workflow maintenance and updates
## Core Management Tasks
### 1. Workflow Monitoring
**Check workflow status:**
```bash
# List all workflow projects
tdx wf projects
# Show workflows in a specific project
tdx wf workflows <project_name>
# Immediately run a workflow and get attempt_id for monitoring
tdx wf run <project_name>.<workflow_name>
# Output: "Started session attempt_id: 12345678"
# Use returned attempt_id to monitor task status
tdx wf attempt 12345678 tasks
# View logs for specific tasks
tdx wf attempt 12345678 logs +task_name
# List recent runs (sessions)
tdx wf sessions <project_name>
# Filter sessions by status
tdx wf sessions <project_name> --status error
tdx wf sessions <project_name> --status running
# View specific attempt details
tdx wf attempt <attempt_id>
```
### 2. Debugging Failed Workflows
**Investigate failure:**
```bash
# Get attempt details
tdx wf attempt <attempt_id>
# Show tasks for an attempt
tdx wf attempt <attempt_id> tasks
# View task logs
tdx wf attempt <attempt_id> logs +task_name
# Include subtasks in task list
tdx wf attempt <attempt_id> tasks --include-subtasks
```
**Common debugging steps:**
1. **Check error message** in logs
2. **Verify query syntax** if td> operator failed
3. **Check time ranges** - ensure data exists for session date
4. **Validate dependencies** - check if upstream tasks completed
5. **Review parameter values** - verify session variables are correct
6. **Check resource limits** - query memory, timeout issues
### 3. Query Performance Issues
**Identify slow queries:**
```yaml
+monitor_query:
td>: queries/analysis.sql
# Add job monitoring
store_last_results: true
+check_performance:
py>: scripts.check_query_performance.main
job_id: ${td.last_job_id}
```
**Optimization checklist:**
- Add time filters (TD_TIME_RANGE)
- Use approximate aggregations (APPROX_DISTINCT)
- Reduce JOIN complexity
- Select only needed columns
- Add query hints for large joins
- Consider breaking into smaller tasks
- Use appropriate engine (Presto vs Hive)
### 4. Workflow Alerting
**Slack notification on failure:**
```yaml
+critical_task:
td>: queries/important_analysis.sql
_error:
+send_slack_alert:
sh>: |
curl -X POST ${secret:slack.webhook_url} \
-H 'Content-Type: application/json' \
-d '{
"text": "Workflow failed: '"${workflow_name}"'",
"attachments": [{
"color": "danger",
"fields": [
{"title": "Session", "value": "'"${session_id}"'", "short": true},
{"title": "Date", "value": "'"${session_date}"'", "short": true}
]
}]
}'
```
**Email notification:**
```yaml
+notify_completion:
py>: scripts.notifications.send_email
recipients: ["team@example.com"]
subject: "Workflow ${workflow_name} completed"
body: "Session ${session_id} completed successfully"
_error:
+notify_failure:
py>: scripts.notifications.send_email
recipients: ["oncall@example.com"]
subject: "ALERT: Workflow ${workflow_name} failed"
body: "Session ${session_id} failed. Check logs immediately."
```
### 5. Data Quality Checks
**Implement validation tasks:**
```yaml
+main_processing:
td>: queries/process_data.sql
create_table: processed_data
+validate_results:
td>:
query: |
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT user_id) as unique_users,
SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) as null_users
FROM processed_data
store_last_results: true
+check_quality:
py>: scripts.data_quality.validate
total_rows: ${td.last_results.total_rows}
null_users: ${td.last_results.null_users}
# Script should fail if quality checks don't pass
```
Python validation script:
```python
def validate(total_rows, null_users):
"""Validate data quality"""
if total_rows == 0:
raise Exception("No data processed")
if null_users > total_rows * 0.01: # More than 1% nulls
raise Exception(f"Too many null users: {null_users}")
return {"status": "passed"}
```
### 6. Dependency Management
**Workflow dependencies:**
```yaml
# workflows/upstream.dig
+produce_data:
td>: queries/create_source_data.sql
create_table: source_data_${session_date_compact}
```
```yaml
# workflows/downstream.dig
schedule:
daily>: 04:00:00 # Runs after upstream (3:00)
_export:
requires:
- upstream_workflow # Wait for upstream completion
+consume_data:
td>:
query: |
SELECT * FROM source_data_${session_date_compact}
create_table: processed_data
```
**Manual dependency with polling:**
```yaml
+wait_for_upstream:
sh>: |
for i in {1..60}; do
if tdx describe production_db.source_data_${session_date_compact}; then
exit 0
fi
sleep 60
done
exit 1
retry: 3
+process_dependent_data:
td>: queries/dependent_processing.sql
```
### 7. Backfill Operations
**Backfill for date range:**
Use the `tdx wf attempt <id> retry` command to re-run workflows for specific attempts, or use the TD Console to trigger manual runs with custom parameters.
```bash
# Retry an attempt
tdx wf attempt <attempt_id> retry
# Retry from a specific task
tdx wf attempt <attempt_id> retry --resume-from +step_name
# Retry with parameter overrides
tdx wf attempt <attempt_id> retry --params '{"session_date":"2024-01-15"}'
```
**Backfill workflow pattern:**
```yaml
# backfill.dig
+backfill:
loop>:
dates:
- 2024-01-01
- 2024-01-02
- 2024-01-03
# ... more dates
_do:
+process_date:
call>: main_workflow.dig
params:
session_date: ${dates}
```
### 8. Workflow Versioning
**Best practices for updates:**
1. **Test in development environment first**
2. **Use version comments:**
```yaml
# Version: 2.1.0
# Changes: Added data quality validation
# Date: 2024-01-15
timezone: Asia/Tokyo
```
3. **Keep backup of working version:**
```bash
# Download current version from TD before making changes
tdx wf download my_workflow ./backup
# Or create local backup
cp workflow.dig workflow.dig.backup.$(date +%Y%m%d)
```
4. **Gradual rollout for critical workflows:**
```yaml
# Run new version in parallel with old version
+new_version:
td>: queries/new_processing.sql
create_table: results_v2
+old_version:
td>: queries/old_processing.sql
create_table: results_v1
+compare_results:
td>:
query: |
SELECT
(SELECT COUNT(*) FROM results_v1) as v1_count,
(SELECT COUNT(*) FROM results_v2) as v2_count
store_last_results: true
```
### 9. Resource Optimization
**Query resource management:**
```yaml
+large_query:
td>: queries/heavy_processing.sql
# Set query priority (lower = higher priority)
priority: 0
# Set result output size
result_connection: ${td.database}:result_table
# Engine settings
engine: presto
engine_version: stable
```
**Parallel task optimization:**
```yaml
# Limit parallelism to avoid resource exhaustion
+process_many:
for_each>:
batch: ["batch_1", "batch_2", "batch_3", "batch_4", "batch_5"]
_parallel:
limit: 2 # Only run 2 tasks in parallel
_do:
+process_batch:
td>: queries/process_batch.sql
create_table: ${batch}_results
```
### 10. Monitoring and Metrics
**Collect workflow metrics:**
```yaml
+workflow_start:
py>: scripts.metrics.record_start
workflow: ${workflow_name}
session: ${session_id}
+main_work:
td>: queries/main_query.sql
+workflow_end:
py>: scripts.metrics.record_completion
workflow: ${workflow_name}
session: ${session_id}
duration: ${session_duration}
_error:
+record_failure:
py>: scripts.metrics.record_failure
workflow: ${workflow_name}
session: ${session_id}
```
**Metrics tracking script:**
```python
import pytd
from datetime import datetime
def record_start(workflow, session):
client = pytd.Client(database='monitoring')
client.query(f"""
INSERT INTO workflow_metrics
VALUES (
'{workflow}',
'{session}',
{int(datetime.now().timestamp())},
NULL,
'running'
)
""")
def record_completion(workflow, session, duration):
client = pytd.Client(database='monitoring')
client.query(f"""
UPDATE workflow_metrics
SET end_time = {int(datetime.now().timestamp())},
status = 'completed'
WHERE workflow = '{workflow}'
AND session_id = '{session}'
""")
```
## Common Issues and Solutions
### Issue: Workflow Runs Too Long
**Solutions:**
1. Break into smaller parallel tasks
2. Optimize queries (add time filters, use APPROX functions)
3. Use incremental processing instead of full refresh
4. Consider Presto instead of Hive for faster execution
5. Add indexes if querying external databases
### Issue: Frequent Timeouts
**Solutions:**
```yaml
+long_running_query:
td>: queries/complex_analysis.sql
timeout: 3600s # Increase timeout to 1 hour
retry: 2
retry_wait: 300s
```
### Issue: Intermittent Failures
**Solutions:**
```yaml
+flaky_task:
td>: queries/external_api_call.sql
retry: 5
retry_wait: 60s
retry_wait_multiplier: 2.0 # Exponential backoff
```
### Issue: Data Not Available
**Solutions:**
```yaml
+wait_for_data:
sh>: |
# Wait up to 30 minutes for data
for i in {1..30}; do
COUNT=$(tdx query -d analytics "SELECT COUNT(*) FROM source WHERE date='${session_date}'" --format csv | tail -1)
if [ "$COUNT" -gt 0 ]; then
exit 0
fi
sleep 60
done
exit 1
+process_data:
td>: queries/process.sql
```
### Issue: Out of Memory
**Solutions:**
1. Reduce query complexity
2. Add better filters to reduce data volume
3. Use sampling for analysis
4. Split into multiple smaller queries
5. Increase query resources (contact TD admin)
### Issue: Duplicate Runs
**Solutions:**
```yaml
# Use idempotent operations
+safe_insert:
td>:
query: |
DELETE FROM target_table
WHERE date = '${session_date}';
INSERT INTO target_table
SELECT * FROM source_table
WHERE date = '${session_date}'
```
## Best Practices
1. **Implement comprehensive error handling** for all critical tasks
2. **Add logging** at key workflow stages
3. **Monitor query performance** regularly
4. **Set up alerts** for failures and SLA violations
5. **Use idempotent operations** to handle reruns safely
6. **Document workflow dependencies** clearly
7. **Implement data quality checks** after processing
8. **Keep workflows modular** for easier maintenance
9. **Version control workflows** in git
10. **Test changes** in dev environment first
11. **Monitor resource usage** and optimize
12. **Set appropriate timeouts** and retries
13. **Use meaningful task names** for debugging
14. **Archive old workflow versions** for rollback capability
## Maintenance Checklist
Weekly:
- Review failed workflow sessions
- Check query performance trends
- Monitor resource utilization
- Review alert patterns
Monthly:
- Clean up old temporary tables
- Review and optimize slow workflows
- Update documentation
- Review and update dependencies
- Check for deprecated features
Quarterly:
- Performance audit of all workflows
- Review workflow architecture
- Update error handling patterns
- Security review (secrets, access)
## Resources
- TD Console: Access workflow logs and monitoring
- Treasure Workflow Quick Start: https://docs.treasuredata.com/articles/#!pd/treasure-workflow-quick-start-using-td-toolbelt-in-a-cli
- tdx CLI: Command-line workflow management using `tdx wf` commands
- Query performance: Use EXPLAIN for query optimization
- Internal docs: Check TD internal documentation for updates
## tdx Workflow Command Reference
| Command | Description |
|---------|-------------|
| `tdx wf projects` | List all workflow projects |
| `tdx wf workflows [project]` | List workflows (optionally for a project) |
| `tdx wf run <project>.<workflow>` | Immediately run a workflow, returns attempt_id |
| `tdx wf sessions [project]` | List workflow sessions |
| `tdx wf attempts [project]` | List workflow attempts |
| `tdx wf attempt <id>` | Show attempt details |
| `tdx wf attempt <id> tasks` | Show tasks for an attempt |
| `tdx wf attempt <id> logs [+task]` | View task logs (interactive selector if no task specified) |
| `tdx wf attempt <id> kill` | Kill a running attempt |
| `tdx wf attempt <id> retry` | Retry an attempt |
| `tdx wf download <project>` | Download workflow project |
| `tdx wf push <project>` | Push workflow to TD |
| `tdx wf delete <project>` | Delete workflow project |