Files
2025-11-30 09:03:06 +08:00

527 lines
13 KiB
Markdown

---
name: workflow-management
description: Expert assistance for managing, debugging, monitoring, and optimizing Treasure Data workflows. Use this skill when users need help troubleshooting workflow failures, improving performance, or implementing workflow best practices.
---
# Treasure Workflow Management Expert
Expert assistance for managing and optimizing Treasure Workflow (Treasure Data's workflow orchestration tool).
## When to Use This Skill
Use this skill when:
- Debugging workflow failures or errors
- Optimizing workflow performance
- Monitoring workflow execution
- Implementing workflow alerting and notifications
- Managing workflow dependencies
- Troubleshooting scheduling issues
- Performing workflow maintenance and updates
## Core Management Tasks
### 1. Workflow Monitoring
**Check workflow status:**
```bash
# List all workflow projects
tdx wf projects
# Show workflows in a specific project
tdx wf workflows <project_name>
# Immediately run a workflow and get attempt_id for monitoring
tdx wf run <project_name>.<workflow_name>
# Output: "Started session attempt_id: 12345678"
# Use returned attempt_id to monitor task status
tdx wf attempt 12345678 tasks
# View logs for specific tasks
tdx wf attempt 12345678 logs +task_name
# List recent runs (sessions)
tdx wf sessions <project_name>
# Filter sessions by status
tdx wf sessions <project_name> --status error
tdx wf sessions <project_name> --status running
# View specific attempt details
tdx wf attempt <attempt_id>
```
### 2. Debugging Failed Workflows
**Investigate failure:**
```bash
# Get attempt details
tdx wf attempt <attempt_id>
# Show tasks for an attempt
tdx wf attempt <attempt_id> tasks
# View task logs
tdx wf attempt <attempt_id> logs +task_name
# Include subtasks in task list
tdx wf attempt <attempt_id> tasks --include-subtasks
```
**Common debugging steps:**
1. **Check error message** in logs
2. **Verify query syntax** if td> operator failed
3. **Check time ranges** - ensure data exists for session date
4. **Validate dependencies** - check if upstream tasks completed
5. **Review parameter values** - verify session variables are correct
6. **Check resource limits** - query memory, timeout issues
### 3. Query Performance Issues
**Identify slow queries:**
```yaml
+monitor_query:
td>: queries/analysis.sql
# Add job monitoring
store_last_results: true
+check_performance:
py>: scripts.check_query_performance.main
job_id: ${td.last_job_id}
```
**Optimization checklist:**
- Add time filters (TD_TIME_RANGE)
- Use approximate aggregations (APPROX_DISTINCT)
- Reduce JOIN complexity
- Select only needed columns
- Add query hints for large joins
- Consider breaking into smaller tasks
- Use appropriate engine (Presto vs Hive)
### 4. Workflow Alerting
**Slack notification on failure:**
```yaml
+critical_task:
td>: queries/important_analysis.sql
_error:
+send_slack_alert:
sh>: |
curl -X POST ${secret:slack.webhook_url} \
-H 'Content-Type: application/json' \
-d '{
"text": "Workflow failed: '"${workflow_name}"'",
"attachments": [{
"color": "danger",
"fields": [
{"title": "Session", "value": "'"${session_id}"'", "short": true},
{"title": "Date", "value": "'"${session_date}"'", "short": true}
]
}]
}'
```
**Email notification:**
```yaml
+notify_completion:
py>: scripts.notifications.send_email
recipients: ["team@example.com"]
subject: "Workflow ${workflow_name} completed"
body: "Session ${session_id} completed successfully"
_error:
+notify_failure:
py>: scripts.notifications.send_email
recipients: ["oncall@example.com"]
subject: "ALERT: Workflow ${workflow_name} failed"
body: "Session ${session_id} failed. Check logs immediately."
```
### 5. Data Quality Checks
**Implement validation tasks:**
```yaml
+main_processing:
td>: queries/process_data.sql
create_table: processed_data
+validate_results:
td>:
query: |
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT user_id) as unique_users,
SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) as null_users
FROM processed_data
store_last_results: true
+check_quality:
py>: scripts.data_quality.validate
total_rows: ${td.last_results.total_rows}
null_users: ${td.last_results.null_users}
# Script should fail if quality checks don't pass
```
Python validation script:
```python
def validate(total_rows, null_users):
"""Validate data quality"""
if total_rows == 0:
raise Exception("No data processed")
if null_users > total_rows * 0.01: # More than 1% nulls
raise Exception(f"Too many null users: {null_users}")
return {"status": "passed"}
```
### 6. Dependency Management
**Workflow dependencies:**
```yaml
# workflows/upstream.dig
+produce_data:
td>: queries/create_source_data.sql
create_table: source_data_${session_date_compact}
```
```yaml
# workflows/downstream.dig
schedule:
daily>: 04:00:00 # Runs after upstream (3:00)
_export:
requires:
- upstream_workflow # Wait for upstream completion
+consume_data:
td>:
query: |
SELECT * FROM source_data_${session_date_compact}
create_table: processed_data
```
**Manual dependency with polling:**
```yaml
+wait_for_upstream:
sh>: |
for i in {1..60}; do
if tdx describe production_db.source_data_${session_date_compact}; then
exit 0
fi
sleep 60
done
exit 1
retry: 3
+process_dependent_data:
td>: queries/dependent_processing.sql
```
### 7. Backfill Operations
**Backfill for date range:**
Use the `tdx wf attempt <id> retry` command to re-run workflows for specific attempts, or use the TD Console to trigger manual runs with custom parameters.
```bash
# Retry an attempt
tdx wf attempt <attempt_id> retry
# Retry from a specific task
tdx wf attempt <attempt_id> retry --resume-from +step_name
# Retry with parameter overrides
tdx wf attempt <attempt_id> retry --params '{"session_date":"2024-01-15"}'
```
**Backfill workflow pattern:**
```yaml
# backfill.dig
+backfill:
loop>:
dates:
- 2024-01-01
- 2024-01-02
- 2024-01-03
# ... more dates
_do:
+process_date:
call>: main_workflow.dig
params:
session_date: ${dates}
```
### 8. Workflow Versioning
**Best practices for updates:**
1. **Test in development environment first**
2. **Use version comments:**
```yaml
# Version: 2.1.0
# Changes: Added data quality validation
# Date: 2024-01-15
timezone: Asia/Tokyo
```
3. **Keep backup of working version:**
```bash
# Download current version from TD before making changes
tdx wf download my_workflow ./backup
# Or create local backup
cp workflow.dig workflow.dig.backup.$(date +%Y%m%d)
```
4. **Gradual rollout for critical workflows:**
```yaml
# Run new version in parallel with old version
+new_version:
td>: queries/new_processing.sql
create_table: results_v2
+old_version:
td>: queries/old_processing.sql
create_table: results_v1
+compare_results:
td>:
query: |
SELECT
(SELECT COUNT(*) FROM results_v1) as v1_count,
(SELECT COUNT(*) FROM results_v2) as v2_count
store_last_results: true
```
### 9. Resource Optimization
**Query resource management:**
```yaml
+large_query:
td>: queries/heavy_processing.sql
# Set query priority (lower = higher priority)
priority: 0
# Set result output size
result_connection: ${td.database}:result_table
# Engine settings
engine: presto
engine_version: stable
```
**Parallel task optimization:**
```yaml
# Limit parallelism to avoid resource exhaustion
+process_many:
for_each>:
batch: ["batch_1", "batch_2", "batch_3", "batch_4", "batch_5"]
_parallel:
limit: 2 # Only run 2 tasks in parallel
_do:
+process_batch:
td>: queries/process_batch.sql
create_table: ${batch}_results
```
### 10. Monitoring and Metrics
**Collect workflow metrics:**
```yaml
+workflow_start:
py>: scripts.metrics.record_start
workflow: ${workflow_name}
session: ${session_id}
+main_work:
td>: queries/main_query.sql
+workflow_end:
py>: scripts.metrics.record_completion
workflow: ${workflow_name}
session: ${session_id}
duration: ${session_duration}
_error:
+record_failure:
py>: scripts.metrics.record_failure
workflow: ${workflow_name}
session: ${session_id}
```
**Metrics tracking script:**
```python
import pytd
from datetime import datetime
def record_start(workflow, session):
client = pytd.Client(database='monitoring')
client.query(f"""
INSERT INTO workflow_metrics
VALUES (
'{workflow}',
'{session}',
{int(datetime.now().timestamp())},
NULL,
'running'
)
""")
def record_completion(workflow, session, duration):
client = pytd.Client(database='monitoring')
client.query(f"""
UPDATE workflow_metrics
SET end_time = {int(datetime.now().timestamp())},
status = 'completed'
WHERE workflow = '{workflow}'
AND session_id = '{session}'
""")
```
## Common Issues and Solutions
### Issue: Workflow Runs Too Long
**Solutions:**
1. Break into smaller parallel tasks
2. Optimize queries (add time filters, use APPROX functions)
3. Use incremental processing instead of full refresh
4. Consider Presto instead of Hive for faster execution
5. Add indexes if querying external databases
### Issue: Frequent Timeouts
**Solutions:**
```yaml
+long_running_query:
td>: queries/complex_analysis.sql
timeout: 3600s # Increase timeout to 1 hour
retry: 2
retry_wait: 300s
```
### Issue: Intermittent Failures
**Solutions:**
```yaml
+flaky_task:
td>: queries/external_api_call.sql
retry: 5
retry_wait: 60s
retry_wait_multiplier: 2.0 # Exponential backoff
```
### Issue: Data Not Available
**Solutions:**
```yaml
+wait_for_data:
sh>: |
# Wait up to 30 minutes for data
for i in {1..30}; do
COUNT=$(tdx query -d analytics "SELECT COUNT(*) FROM source WHERE date='${session_date}'" --format csv | tail -1)
if [ "$COUNT" -gt 0 ]; then
exit 0
fi
sleep 60
done
exit 1
+process_data:
td>: queries/process.sql
```
### Issue: Out of Memory
**Solutions:**
1. Reduce query complexity
2. Add better filters to reduce data volume
3. Use sampling for analysis
4. Split into multiple smaller queries
5. Increase query resources (contact TD admin)
### Issue: Duplicate Runs
**Solutions:**
```yaml
# Use idempotent operations
+safe_insert:
td>:
query: |
DELETE FROM target_table
WHERE date = '${session_date}';
INSERT INTO target_table
SELECT * FROM source_table
WHERE date = '${session_date}'
```
## Best Practices
1. **Implement comprehensive error handling** for all critical tasks
2. **Add logging** at key workflow stages
3. **Monitor query performance** regularly
4. **Set up alerts** for failures and SLA violations
5. **Use idempotent operations** to handle reruns safely
6. **Document workflow dependencies** clearly
7. **Implement data quality checks** after processing
8. **Keep workflows modular** for easier maintenance
9. **Version control workflows** in git
10. **Test changes** in dev environment first
11. **Monitor resource usage** and optimize
12. **Set appropriate timeouts** and retries
13. **Use meaningful task names** for debugging
14. **Archive old workflow versions** for rollback capability
## Maintenance Checklist
Weekly:
- Review failed workflow sessions
- Check query performance trends
- Monitor resource utilization
- Review alert patterns
Monthly:
- Clean up old temporary tables
- Review and optimize slow workflows
- Update documentation
- Review and update dependencies
- Check for deprecated features
Quarterly:
- Performance audit of all workflows
- Review workflow architecture
- Update error handling patterns
- Security review (secrets, access)
## Resources
- TD Console: Access workflow logs and monitoring
- Treasure Workflow Quick Start: https://docs.treasuredata.com/articles/#!pd/treasure-workflow-quick-start-using-td-toolbelt-in-a-cli
- tdx CLI: Command-line workflow management using `tdx wf` commands
- Query performance: Use EXPLAIN for query optimization
- Internal docs: Check TD internal documentation for updates
## tdx Workflow Command Reference
| Command | Description |
|---------|-------------|
| `tdx wf projects` | List all workflow projects |
| `tdx wf workflows [project]` | List workflows (optionally for a project) |
| `tdx wf run <project>.<workflow>` | Immediately run a workflow, returns attempt_id |
| `tdx wf sessions [project]` | List workflow sessions |
| `tdx wf attempts [project]` | List workflow attempts |
| `tdx wf attempt <id>` | Show attempt details |
| `tdx wf attempt <id> tasks` | Show tasks for an attempt |
| `tdx wf attempt <id> logs [+task]` | View task logs (interactive selector if no task specified) |
| `tdx wf attempt <id> kill` | Kill a running attempt |
| `tdx wf attempt <id> retry` | Retry an attempt |
| `tdx wf download <project>` | Download workflow project |
| `tdx wf push <project>` | Push workflow to TD |
| `tdx wf delete <project>` | Delete workflow project |