Files
2025-11-30 09:03:06 +08:00

13 KiB

name, description
name description
workflow-management Expert assistance for managing, debugging, monitoring, and optimizing Treasure Data workflows. Use this skill when users need help troubleshooting workflow failures, improving performance, or implementing workflow best practices.

Treasure Workflow Management Expert

Expert assistance for managing and optimizing Treasure Workflow (Treasure Data's workflow orchestration tool).

When to Use This Skill

Use this skill when:

  • Debugging workflow failures or errors
  • Optimizing workflow performance
  • Monitoring workflow execution
  • Implementing workflow alerting and notifications
  • Managing workflow dependencies
  • Troubleshooting scheduling issues
  • Performing workflow maintenance and updates

Core Management Tasks

1. Workflow Monitoring

Check workflow status:

# List all workflow projects
tdx wf projects

# Show workflows in a specific project
tdx wf workflows <project_name>

# Immediately run a workflow and get attempt_id for monitoring
tdx wf run <project_name>.<workflow_name>
# Output: "Started session attempt_id: 12345678"

# Use returned attempt_id to monitor task status
tdx wf attempt 12345678 tasks

# View logs for specific tasks
tdx wf attempt 12345678 logs +task_name

# List recent runs (sessions)
tdx wf sessions <project_name>

# Filter sessions by status
tdx wf sessions <project_name> --status error
tdx wf sessions <project_name> --status running

# View specific attempt details
tdx wf attempt <attempt_id>

2. Debugging Failed Workflows

Investigate failure:

# Get attempt details
tdx wf attempt <attempt_id>

# Show tasks for an attempt
tdx wf attempt <attempt_id> tasks

# View task logs
tdx wf attempt <attempt_id> logs +task_name

# Include subtasks in task list
tdx wf attempt <attempt_id> tasks --include-subtasks

Common debugging steps:

  1. Check error message in logs
  2. Verify query syntax if td> operator failed
  3. Check time ranges - ensure data exists for session date
  4. Validate dependencies - check if upstream tasks completed
  5. Review parameter values - verify session variables are correct
  6. Check resource limits - query memory, timeout issues

3. Query Performance Issues

Identify slow queries:

+monitor_query:
  td>: queries/analysis.sql
  # Add job monitoring
  store_last_results: true

+check_performance:
  py>: scripts.check_query_performance.main
  job_id: ${td.last_job_id}

Optimization checklist:

  • Add time filters (TD_TIME_RANGE)
  • Use approximate aggregations (APPROX_DISTINCT)
  • Reduce JOIN complexity
  • Select only needed columns
  • Add query hints for large joins
  • Consider breaking into smaller tasks
  • Use appropriate engine (Presto vs Hive)

4. Workflow Alerting

Slack notification on failure:

+critical_task:
  td>: queries/important_analysis.sql

  _error:
    +send_slack_alert:
      sh>: |
        curl -X POST ${secret:slack.webhook_url} \
        -H 'Content-Type: application/json' \
        -d '{
          "text": "Workflow failed: '"${workflow_name}"'",
          "attachments": [{
            "color": "danger",
            "fields": [
              {"title": "Session", "value": "'"${session_id}"'", "short": true},
              {"title": "Date", "value": "'"${session_date}"'", "short": true}
            ]
          }]
        }'

Email notification:

+notify_completion:
  py>: scripts.notifications.send_email
  recipients: ["team@example.com"]
  subject: "Workflow ${workflow_name} completed"
  body: "Session ${session_id} completed successfully"

  _error:
    +notify_failure:
      py>: scripts.notifications.send_email
      recipients: ["oncall@example.com"]
      subject: "ALERT: Workflow ${workflow_name} failed"
      body: "Session ${session_id} failed. Check logs immediately."

5. Data Quality Checks

Implement validation tasks:

+main_processing:
  td>: queries/process_data.sql
  create_table: processed_data

+validate_results:
  td>:
    query: |
      SELECT
        COUNT(*) as total_rows,
        COUNT(DISTINCT user_id) as unique_users,
        SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) as null_users
      FROM processed_data
  store_last_results: true

+check_quality:
  py>: scripts.data_quality.validate
  total_rows: ${td.last_results.total_rows}
  null_users: ${td.last_results.null_users}
  # Script should fail if quality checks don't pass

Python validation script:

def validate(total_rows, null_users):
    """Validate data quality"""
    if total_rows == 0:
        raise Exception("No data processed")

    if null_users > total_rows * 0.01:  # More than 1% nulls
        raise Exception(f"Too many null users: {null_users}")

    return {"status": "passed"}

6. Dependency Management

Workflow dependencies:

# workflows/upstream.dig
+produce_data:
  td>: queries/create_source_data.sql
  create_table: source_data_${session_date_compact}
# workflows/downstream.dig
schedule:
  daily>: 04:00:00  # Runs after upstream (3:00)

_export:
  requires:
    - upstream_workflow  # Wait for upstream completion

+consume_data:
  td>:
    query: |
      SELECT * FROM source_data_${session_date_compact}
  create_table: processed_data

Manual dependency with polling:

+wait_for_upstream:
  sh>: |
    for i in {1..60}; do
      if tdx describe production_db.source_data_${session_date_compact}; then
        exit 0
      fi
      sleep 60
    done
    exit 1
  retry: 3

+process_dependent_data:
  td>: queries/dependent_processing.sql

7. Backfill Operations

Backfill for date range:

Use the tdx wf attempt <id> retry command to re-run workflows for specific attempts, or use the TD Console to trigger manual runs with custom parameters.

# Retry an attempt
tdx wf attempt <attempt_id> retry

# Retry from a specific task
tdx wf attempt <attempt_id> retry --resume-from +step_name

# Retry with parameter overrides
tdx wf attempt <attempt_id> retry --params '{"session_date":"2024-01-15"}'

Backfill workflow pattern:

# backfill.dig
+backfill:
  loop>:
    dates:
      - 2024-01-01
      - 2024-01-02
      - 2024-01-03
      # ... more dates
  _do:
    +process_date:
      call>: main_workflow.dig
      params:
        session_date: ${dates}

8. Workflow Versioning

Best practices for updates:

  1. Test in development environment first
  2. Use version comments:
# Version: 2.1.0
# Changes: Added data quality validation
# Date: 2024-01-15

timezone: Asia/Tokyo
  1. Keep backup of working version:
# Download current version from TD before making changes
tdx wf download my_workflow ./backup

# Or create local backup
cp workflow.dig workflow.dig.backup.$(date +%Y%m%d)
  1. Gradual rollout for critical workflows:
# Run new version in parallel with old version
+new_version:
  td>: queries/new_processing.sql
  create_table: results_v2

+old_version:
  td>: queries/old_processing.sql
  create_table: results_v1

+compare_results:
  td>:
    query: |
      SELECT
        (SELECT COUNT(*) FROM results_v1) as v1_count,
        (SELECT COUNT(*) FROM results_v2) as v2_count
  store_last_results: true

9. Resource Optimization

Query resource management:

+large_query:
  td>: queries/heavy_processing.sql
  # Set query priority (lower = higher priority)
  priority: 0

  # Set result output size
  result_connection: ${td.database}:result_table

  # Engine settings
  engine: presto
  engine_version: stable

Parallel task optimization:

# Limit parallelism to avoid resource exhaustion
+process_many:
  for_each>:
    batch: ["batch_1", "batch_2", "batch_3", "batch_4", "batch_5"]
  _parallel:
    limit: 2  # Only run 2 tasks in parallel
  _do:
    +process_batch:
      td>: queries/process_batch.sql
      create_table: ${batch}_results

10. Monitoring and Metrics

Collect workflow metrics:

+workflow_start:
  py>: scripts.metrics.record_start
  workflow: ${workflow_name}
  session: ${session_id}

+main_work:
  td>: queries/main_query.sql

+workflow_end:
  py>: scripts.metrics.record_completion
  workflow: ${workflow_name}
  session: ${session_id}
  duration: ${session_duration}

  _error:
    +record_failure:
      py>: scripts.metrics.record_failure
      workflow: ${workflow_name}
      session: ${session_id}

Metrics tracking script:

import pytd
from datetime import datetime

def record_start(workflow, session):
    client = pytd.Client(database='monitoring')
    client.query(f"""
        INSERT INTO workflow_metrics
        VALUES (
            '{workflow}',
            '{session}',
            {int(datetime.now().timestamp())},
            NULL,
            'running'
        )
    """)

def record_completion(workflow, session, duration):
    client = pytd.Client(database='monitoring')
    client.query(f"""
        UPDATE workflow_metrics
        SET end_time = {int(datetime.now().timestamp())},
            status = 'completed'
        WHERE workflow = '{workflow}'
          AND session_id = '{session}'
    """)

Common Issues and Solutions

Issue: Workflow Runs Too Long

Solutions:

  1. Break into smaller parallel tasks
  2. Optimize queries (add time filters, use APPROX functions)
  3. Use incremental processing instead of full refresh
  4. Consider Presto instead of Hive for faster execution
  5. Add indexes if querying external databases

Issue: Frequent Timeouts

Solutions:

+long_running_query:
  td>: queries/complex_analysis.sql
  timeout: 3600s  # Increase timeout to 1 hour
  retry: 2
  retry_wait: 300s

Issue: Intermittent Failures

Solutions:

+flaky_task:
  td>: queries/external_api_call.sql
  retry: 5
  retry_wait: 60s
  retry_wait_multiplier: 2.0  # Exponential backoff

Issue: Data Not Available

Solutions:

+wait_for_data:
  sh>: |
    # Wait up to 30 minutes for data
    for i in {1..30}; do
      COUNT=$(tdx query -d analytics "SELECT COUNT(*) FROM source WHERE date='${session_date}'" --format csv | tail -1)
      if [ "$COUNT" -gt 0 ]; then
        exit 0
      fi
      sleep 60
    done
    exit 1

+process_data:
  td>: queries/process.sql

Issue: Out of Memory

Solutions:

  1. Reduce query complexity
  2. Add better filters to reduce data volume
  3. Use sampling for analysis
  4. Split into multiple smaller queries
  5. Increase query resources (contact TD admin)

Issue: Duplicate Runs

Solutions:

# Use idempotent operations
+safe_insert:
  td>:
    query: |
      DELETE FROM target_table
      WHERE date = '${session_date}';

      INSERT INTO target_table
      SELECT * FROM source_table
      WHERE date = '${session_date}'

Best Practices

  1. Implement comprehensive error handling for all critical tasks
  2. Add logging at key workflow stages
  3. Monitor query performance regularly
  4. Set up alerts for failures and SLA violations
  5. Use idempotent operations to handle reruns safely
  6. Document workflow dependencies clearly
  7. Implement data quality checks after processing
  8. Keep workflows modular for easier maintenance
  9. Version control workflows in git
  10. Test changes in dev environment first
  11. Monitor resource usage and optimize
  12. Set appropriate timeouts and retries
  13. Use meaningful task names for debugging
  14. Archive old workflow versions for rollback capability

Maintenance Checklist

Weekly:

  • Review failed workflow sessions
  • Check query performance trends
  • Monitor resource utilization
  • Review alert patterns

Monthly:

  • Clean up old temporary tables
  • Review and optimize slow workflows
  • Update documentation
  • Review and update dependencies
  • Check for deprecated features

Quarterly:

  • Performance audit of all workflows
  • Review workflow architecture
  • Update error handling patterns
  • Security review (secrets, access)

Resources

tdx Workflow Command Reference

Command Description
tdx wf projects List all workflow projects
tdx wf workflows [project] List workflows (optionally for a project)
tdx wf run <project>.<workflow> Immediately run a workflow, returns attempt_id
tdx wf sessions [project] List workflow sessions
tdx wf attempts [project] List workflow attempts
tdx wf attempt <id> Show attempt details
tdx wf attempt <id> tasks Show tasks for an attempt
tdx wf attempt <id> logs [+task] View task logs (interactive selector if no task specified)
tdx wf attempt <id> kill Kill a running attempt
tdx wf attempt <id> retry Retry an attempt
tdx wf download <project> Download workflow project
tdx wf push <project> Push workflow to TD
tdx wf delete <project> Delete workflow project