527 lines
13 KiB
Markdown
527 lines
13 KiB
Markdown
---
|
|
name: workflow-management
|
|
description: Expert assistance for managing, debugging, monitoring, and optimizing Treasure Data workflows. Use this skill when users need help troubleshooting workflow failures, improving performance, or implementing workflow best practices.
|
|
---
|
|
|
|
# Treasure Workflow Management Expert
|
|
|
|
Expert assistance for managing and optimizing Treasure Workflow (Treasure Data's workflow orchestration tool).
|
|
|
|
## When to Use This Skill
|
|
|
|
Use this skill when:
|
|
- Debugging workflow failures or errors
|
|
- Optimizing workflow performance
|
|
- Monitoring workflow execution
|
|
- Implementing workflow alerting and notifications
|
|
- Managing workflow dependencies
|
|
- Troubleshooting scheduling issues
|
|
- Performing workflow maintenance and updates
|
|
|
|
## Core Management Tasks
|
|
|
|
### 1. Workflow Monitoring
|
|
|
|
**Check workflow status:**
|
|
```bash
|
|
# List all workflow projects
|
|
tdx wf projects
|
|
|
|
# Show workflows in a specific project
|
|
tdx wf workflows <project_name>
|
|
|
|
# Immediately run a workflow and get attempt_id for monitoring
|
|
tdx wf run <project_name>.<workflow_name>
|
|
# Output: "Started session attempt_id: 12345678"
|
|
|
|
# Use returned attempt_id to monitor task status
|
|
tdx wf attempt 12345678 tasks
|
|
|
|
# View logs for specific tasks
|
|
tdx wf attempt 12345678 logs +task_name
|
|
|
|
# List recent runs (sessions)
|
|
tdx wf sessions <project_name>
|
|
|
|
# Filter sessions by status
|
|
tdx wf sessions <project_name> --status error
|
|
tdx wf sessions <project_name> --status running
|
|
|
|
# View specific attempt details
|
|
tdx wf attempt <attempt_id>
|
|
```
|
|
|
|
### 2. Debugging Failed Workflows
|
|
|
|
**Investigate failure:**
|
|
```bash
|
|
# Get attempt details
|
|
tdx wf attempt <attempt_id>
|
|
|
|
# Show tasks for an attempt
|
|
tdx wf attempt <attempt_id> tasks
|
|
|
|
# View task logs
|
|
tdx wf attempt <attempt_id> logs +task_name
|
|
|
|
# Include subtasks in task list
|
|
tdx wf attempt <attempt_id> tasks --include-subtasks
|
|
```
|
|
|
|
**Common debugging steps:**
|
|
|
|
1. **Check error message** in logs
|
|
2. **Verify query syntax** if td> operator failed
|
|
3. **Check time ranges** - ensure data exists for session date
|
|
4. **Validate dependencies** - check if upstream tasks completed
|
|
5. **Review parameter values** - verify session variables are correct
|
|
6. **Check resource limits** - query memory, timeout issues
|
|
|
|
### 3. Query Performance Issues
|
|
|
|
**Identify slow queries:**
|
|
```yaml
|
|
+monitor_query:
|
|
td>: queries/analysis.sql
|
|
# Add job monitoring
|
|
store_last_results: true
|
|
|
|
+check_performance:
|
|
py>: scripts.check_query_performance.main
|
|
job_id: ${td.last_job_id}
|
|
```
|
|
|
|
**Optimization checklist:**
|
|
- Add time filters (TD_TIME_RANGE)
|
|
- Use approximate aggregations (APPROX_DISTINCT)
|
|
- Reduce JOIN complexity
|
|
- Select only needed columns
|
|
- Add query hints for large joins
|
|
- Consider breaking into smaller tasks
|
|
- Use appropriate engine (Presto vs Hive)
|
|
|
|
### 4. Workflow Alerting
|
|
|
|
**Slack notification on failure:**
|
|
```yaml
|
|
+critical_task:
|
|
td>: queries/important_analysis.sql
|
|
|
|
_error:
|
|
+send_slack_alert:
|
|
sh>: |
|
|
curl -X POST ${secret:slack.webhook_url} \
|
|
-H 'Content-Type: application/json' \
|
|
-d '{
|
|
"text": "Workflow failed: '"${workflow_name}"'",
|
|
"attachments": [{
|
|
"color": "danger",
|
|
"fields": [
|
|
{"title": "Session", "value": "'"${session_id}"'", "short": true},
|
|
{"title": "Date", "value": "'"${session_date}"'", "short": true}
|
|
]
|
|
}]
|
|
}'
|
|
```
|
|
|
|
**Email notification:**
|
|
```yaml
|
|
+notify_completion:
|
|
py>: scripts.notifications.send_email
|
|
recipients: ["team@example.com"]
|
|
subject: "Workflow ${workflow_name} completed"
|
|
body: "Session ${session_id} completed successfully"
|
|
|
|
_error:
|
|
+notify_failure:
|
|
py>: scripts.notifications.send_email
|
|
recipients: ["oncall@example.com"]
|
|
subject: "ALERT: Workflow ${workflow_name} failed"
|
|
body: "Session ${session_id} failed. Check logs immediately."
|
|
```
|
|
|
|
### 5. Data Quality Checks
|
|
|
|
**Implement validation tasks:**
|
|
```yaml
|
|
+main_processing:
|
|
td>: queries/process_data.sql
|
|
create_table: processed_data
|
|
|
|
+validate_results:
|
|
td>:
|
|
query: |
|
|
SELECT
|
|
COUNT(*) as total_rows,
|
|
COUNT(DISTINCT user_id) as unique_users,
|
|
SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) as null_users
|
|
FROM processed_data
|
|
store_last_results: true
|
|
|
|
+check_quality:
|
|
py>: scripts.data_quality.validate
|
|
total_rows: ${td.last_results.total_rows}
|
|
null_users: ${td.last_results.null_users}
|
|
# Script should fail if quality checks don't pass
|
|
```
|
|
|
|
Python validation script:
|
|
```python
|
|
def validate(total_rows, null_users):
|
|
"""Validate data quality"""
|
|
if total_rows == 0:
|
|
raise Exception("No data processed")
|
|
|
|
if null_users > total_rows * 0.01: # More than 1% nulls
|
|
raise Exception(f"Too many null users: {null_users}")
|
|
|
|
return {"status": "passed"}
|
|
```
|
|
|
|
### 6. Dependency Management
|
|
|
|
**Workflow dependencies:**
|
|
```yaml
|
|
# workflows/upstream.dig
|
|
+produce_data:
|
|
td>: queries/create_source_data.sql
|
|
create_table: source_data_${session_date_compact}
|
|
```
|
|
|
|
```yaml
|
|
# workflows/downstream.dig
|
|
schedule:
|
|
daily>: 04:00:00 # Runs after upstream (3:00)
|
|
|
|
_export:
|
|
requires:
|
|
- upstream_workflow # Wait for upstream completion
|
|
|
|
+consume_data:
|
|
td>:
|
|
query: |
|
|
SELECT * FROM source_data_${session_date_compact}
|
|
create_table: processed_data
|
|
```
|
|
|
|
**Manual dependency with polling:**
|
|
```yaml
|
|
+wait_for_upstream:
|
|
sh>: |
|
|
for i in {1..60}; do
|
|
if tdx describe production_db.source_data_${session_date_compact}; then
|
|
exit 0
|
|
fi
|
|
sleep 60
|
|
done
|
|
exit 1
|
|
retry: 3
|
|
|
|
+process_dependent_data:
|
|
td>: queries/dependent_processing.sql
|
|
```
|
|
|
|
### 7. Backfill Operations
|
|
|
|
**Backfill for date range:**
|
|
|
|
Use the `tdx wf attempt <id> retry` command to re-run workflows for specific attempts, or use the TD Console to trigger manual runs with custom parameters.
|
|
|
|
```bash
|
|
# Retry an attempt
|
|
tdx wf attempt <attempt_id> retry
|
|
|
|
# Retry from a specific task
|
|
tdx wf attempt <attempt_id> retry --resume-from +step_name
|
|
|
|
# Retry with parameter overrides
|
|
tdx wf attempt <attempt_id> retry --params '{"session_date":"2024-01-15"}'
|
|
```
|
|
|
|
**Backfill workflow pattern:**
|
|
```yaml
|
|
# backfill.dig
|
|
+backfill:
|
|
loop>:
|
|
dates:
|
|
- 2024-01-01
|
|
- 2024-01-02
|
|
- 2024-01-03
|
|
# ... more dates
|
|
_do:
|
|
+process_date:
|
|
call>: main_workflow.dig
|
|
params:
|
|
session_date: ${dates}
|
|
```
|
|
|
|
### 8. Workflow Versioning
|
|
|
|
**Best practices for updates:**
|
|
|
|
1. **Test in development environment first**
|
|
2. **Use version comments:**
|
|
```yaml
|
|
# Version: 2.1.0
|
|
# Changes: Added data quality validation
|
|
# Date: 2024-01-15
|
|
|
|
timezone: Asia/Tokyo
|
|
```
|
|
|
|
3. **Keep backup of working version:**
|
|
```bash
|
|
# Download current version from TD before making changes
|
|
tdx wf download my_workflow ./backup
|
|
|
|
# Or create local backup
|
|
cp workflow.dig workflow.dig.backup.$(date +%Y%m%d)
|
|
```
|
|
|
|
4. **Gradual rollout for critical workflows:**
|
|
```yaml
|
|
# Run new version in parallel with old version
|
|
+new_version:
|
|
td>: queries/new_processing.sql
|
|
create_table: results_v2
|
|
|
|
+old_version:
|
|
td>: queries/old_processing.sql
|
|
create_table: results_v1
|
|
|
|
+compare_results:
|
|
td>:
|
|
query: |
|
|
SELECT
|
|
(SELECT COUNT(*) FROM results_v1) as v1_count,
|
|
(SELECT COUNT(*) FROM results_v2) as v2_count
|
|
store_last_results: true
|
|
```
|
|
|
|
### 9. Resource Optimization
|
|
|
|
**Query resource management:**
|
|
```yaml
|
|
+large_query:
|
|
td>: queries/heavy_processing.sql
|
|
# Set query priority (lower = higher priority)
|
|
priority: 0
|
|
|
|
# Set result output size
|
|
result_connection: ${td.database}:result_table
|
|
|
|
# Engine settings
|
|
engine: presto
|
|
engine_version: stable
|
|
```
|
|
|
|
**Parallel task optimization:**
|
|
```yaml
|
|
# Limit parallelism to avoid resource exhaustion
|
|
+process_many:
|
|
for_each>:
|
|
batch: ["batch_1", "batch_2", "batch_3", "batch_4", "batch_5"]
|
|
_parallel:
|
|
limit: 2 # Only run 2 tasks in parallel
|
|
_do:
|
|
+process_batch:
|
|
td>: queries/process_batch.sql
|
|
create_table: ${batch}_results
|
|
```
|
|
|
|
### 10. Monitoring and Metrics
|
|
|
|
**Collect workflow metrics:**
|
|
```yaml
|
|
+workflow_start:
|
|
py>: scripts.metrics.record_start
|
|
workflow: ${workflow_name}
|
|
session: ${session_id}
|
|
|
|
+main_work:
|
|
td>: queries/main_query.sql
|
|
|
|
+workflow_end:
|
|
py>: scripts.metrics.record_completion
|
|
workflow: ${workflow_name}
|
|
session: ${session_id}
|
|
duration: ${session_duration}
|
|
|
|
_error:
|
|
+record_failure:
|
|
py>: scripts.metrics.record_failure
|
|
workflow: ${workflow_name}
|
|
session: ${session_id}
|
|
```
|
|
|
|
**Metrics tracking script:**
|
|
```python
|
|
import pytd
|
|
from datetime import datetime
|
|
|
|
def record_start(workflow, session):
|
|
client = pytd.Client(database='monitoring')
|
|
client.query(f"""
|
|
INSERT INTO workflow_metrics
|
|
VALUES (
|
|
'{workflow}',
|
|
'{session}',
|
|
{int(datetime.now().timestamp())},
|
|
NULL,
|
|
'running'
|
|
)
|
|
""")
|
|
|
|
def record_completion(workflow, session, duration):
|
|
client = pytd.Client(database='monitoring')
|
|
client.query(f"""
|
|
UPDATE workflow_metrics
|
|
SET end_time = {int(datetime.now().timestamp())},
|
|
status = 'completed'
|
|
WHERE workflow = '{workflow}'
|
|
AND session_id = '{session}'
|
|
""")
|
|
```
|
|
|
|
## Common Issues and Solutions
|
|
|
|
### Issue: Workflow Runs Too Long
|
|
|
|
**Solutions:**
|
|
1. Break into smaller parallel tasks
|
|
2. Optimize queries (add time filters, use APPROX functions)
|
|
3. Use incremental processing instead of full refresh
|
|
4. Consider Presto instead of Hive for faster execution
|
|
5. Add indexes if querying external databases
|
|
|
|
### Issue: Frequent Timeouts
|
|
|
|
**Solutions:**
|
|
```yaml
|
|
+long_running_query:
|
|
td>: queries/complex_analysis.sql
|
|
timeout: 3600s # Increase timeout to 1 hour
|
|
retry: 2
|
|
retry_wait: 300s
|
|
```
|
|
|
|
### Issue: Intermittent Failures
|
|
|
|
**Solutions:**
|
|
```yaml
|
|
+flaky_task:
|
|
td>: queries/external_api_call.sql
|
|
retry: 5
|
|
retry_wait: 60s
|
|
retry_wait_multiplier: 2.0 # Exponential backoff
|
|
```
|
|
|
|
### Issue: Data Not Available
|
|
|
|
**Solutions:**
|
|
```yaml
|
|
+wait_for_data:
|
|
sh>: |
|
|
# Wait up to 30 minutes for data
|
|
for i in {1..30}; do
|
|
COUNT=$(tdx query -d analytics "SELECT COUNT(*) FROM source WHERE date='${session_date}'" --format csv | tail -1)
|
|
if [ "$COUNT" -gt 0 ]; then
|
|
exit 0
|
|
fi
|
|
sleep 60
|
|
done
|
|
exit 1
|
|
|
|
+process_data:
|
|
td>: queries/process.sql
|
|
```
|
|
|
|
### Issue: Out of Memory
|
|
|
|
**Solutions:**
|
|
1. Reduce query complexity
|
|
2. Add better filters to reduce data volume
|
|
3. Use sampling for analysis
|
|
4. Split into multiple smaller queries
|
|
5. Increase query resources (contact TD admin)
|
|
|
|
### Issue: Duplicate Runs
|
|
|
|
**Solutions:**
|
|
```yaml
|
|
# Use idempotent operations
|
|
+safe_insert:
|
|
td>:
|
|
query: |
|
|
DELETE FROM target_table
|
|
WHERE date = '${session_date}';
|
|
|
|
INSERT INTO target_table
|
|
SELECT * FROM source_table
|
|
WHERE date = '${session_date}'
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
1. **Implement comprehensive error handling** for all critical tasks
|
|
2. **Add logging** at key workflow stages
|
|
3. **Monitor query performance** regularly
|
|
4. **Set up alerts** for failures and SLA violations
|
|
5. **Use idempotent operations** to handle reruns safely
|
|
6. **Document workflow dependencies** clearly
|
|
7. **Implement data quality checks** after processing
|
|
8. **Keep workflows modular** for easier maintenance
|
|
9. **Version control workflows** in git
|
|
10. **Test changes** in dev environment first
|
|
11. **Monitor resource usage** and optimize
|
|
12. **Set appropriate timeouts** and retries
|
|
13. **Use meaningful task names** for debugging
|
|
14. **Archive old workflow versions** for rollback capability
|
|
|
|
## Maintenance Checklist
|
|
|
|
Weekly:
|
|
- Review failed workflow sessions
|
|
- Check query performance trends
|
|
- Monitor resource utilization
|
|
- Review alert patterns
|
|
|
|
Monthly:
|
|
- Clean up old temporary tables
|
|
- Review and optimize slow workflows
|
|
- Update documentation
|
|
- Review and update dependencies
|
|
- Check for deprecated features
|
|
|
|
Quarterly:
|
|
- Performance audit of all workflows
|
|
- Review workflow architecture
|
|
- Update error handling patterns
|
|
- Security review (secrets, access)
|
|
|
|
## Resources
|
|
|
|
- TD Console: Access workflow logs and monitoring
|
|
- Treasure Workflow Quick Start: https://docs.treasuredata.com/articles/#!pd/treasure-workflow-quick-start-using-td-toolbelt-in-a-cli
|
|
- tdx CLI: Command-line workflow management using `tdx wf` commands
|
|
- Query performance: Use EXPLAIN for query optimization
|
|
- Internal docs: Check TD internal documentation for updates
|
|
|
|
## tdx Workflow Command Reference
|
|
|
|
| Command | Description |
|
|
|---------|-------------|
|
|
| `tdx wf projects` | List all workflow projects |
|
|
| `tdx wf workflows [project]` | List workflows (optionally for a project) |
|
|
| `tdx wf run <project>.<workflow>` | Immediately run a workflow, returns attempt_id |
|
|
| `tdx wf sessions [project]` | List workflow sessions |
|
|
| `tdx wf attempts [project]` | List workflow attempts |
|
|
| `tdx wf attempt <id>` | Show attempt details |
|
|
| `tdx wf attempt <id> tasks` | Show tasks for an attempt |
|
|
| `tdx wf attempt <id> logs [+task]` | View task logs (interactive selector if no task specified) |
|
|
| `tdx wf attempt <id> kill` | Kill a running attempt |
|
|
| `tdx wf attempt <id> retry` | Retry an attempt |
|
|
| `tdx wf download <project>` | Download workflow project |
|
|
| `tdx wf push <project>` | Push workflow to TD |
|
|
| `tdx wf delete <project>` | Delete workflow project |
|