| name | workflow-management |
| description | Expert assistance for managing, debugging, monitoring, and optimizing Treasure Data workflows. Use this skill when users need help troubleshooting workflow failures, improving performance, or implementing workflow best practices. |
Treasure Workflow Management Expert
Expert assistance for managing and optimizing Treasure Workflow (Treasure Data's workflow orchestration tool).
When to Use This Skill
Use this skill when:
- Debugging workflow failures or errors
- Optimizing workflow performance
- Monitoring workflow execution
- Implementing workflow alerting and notifications
- Managing workflow dependencies
- Troubleshooting scheduling issues
- Performing workflow maintenance and updates
Core Management Tasks
1. Workflow Monitoring
Check workflow status:
# List all workflows
td wf list
# Show workflows in a specific project
td wf workflows <project_name>
# List recent runs
td wf sessions <project_name>
# View specific session
td wf session <project_name> <session_id>
2. Debugging Failed Workflows
Investigate failure:
# Get session details
td wf session <project_name> <session_id>
# View task logs
td wf log <project_name> <session_id> +task_name
# Get full session logs
td wf log <project_name> <session_id>
Common debugging steps:
- Check error message in logs
- Verify query syntax if td> operator failed
- Check time ranges - ensure data exists for session date
- Validate dependencies - check if upstream tasks completed
- Review parameter values - verify session variables are correct
- Check resource limits - query memory, timeout issues
3. Query Performance Issues
Identify slow queries:
+monitor_query:
td>: queries/analysis.sql
# Add job monitoring
store_last_results: true
+check_performance:
py>: scripts.check_query_performance.main
job_id: ${td.last_job_id}
Optimization checklist:
- Add time filters (TD_TIME_RANGE)
- Use approximate aggregations (APPROX_DISTINCT)
- Reduce JOIN complexity
- Select only needed columns
- Add query hints for large joins
- Consider breaking into smaller tasks
- Use appropriate engine (Presto vs Hive)
4. Workflow Alerting
Slack notification on failure:
+critical_task:
td>: queries/important_analysis.sql
_error:
+send_slack_alert:
sh>: |
curl -X POST ${secret:slack.webhook_url} \
-H 'Content-Type: application/json' \
-d '{
"text": "Workflow failed: '"${workflow_name}"'",
"attachments": [{
"color": "danger",
"fields": [
{"title": "Session", "value": "'"${session_id}"'", "short": true},
{"title": "Date", "value": "'"${session_date}"'", "short": true}
]
}]
}'
Email notification:
+notify_completion:
py>: scripts.notifications.send_email
recipients: ["team@example.com"]
subject: "Workflow ${workflow_name} completed"
body: "Session ${session_id} completed successfully"
_error:
+notify_failure:
py>: scripts.notifications.send_email
recipients: ["oncall@example.com"]
subject: "ALERT: Workflow ${workflow_name} failed"
body: "Session ${session_id} failed. Check logs immediately."
5. Data Quality Checks
Implement validation tasks:
+main_processing:
td>: queries/process_data.sql
create_table: processed_data
+validate_results:
td>:
query: |
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT user_id) as unique_users,
SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) as null_users
FROM processed_data
store_last_results: true
+check_quality:
py>: scripts.data_quality.validate
total_rows: ${td.last_results.total_rows}
null_users: ${td.last_results.null_users}
# Script should fail if quality checks don't pass
Python validation script:
def validate(total_rows, null_users):
"""Validate data quality"""
if total_rows == 0:
raise Exception("No data processed")
if null_users > total_rows * 0.01: # More than 1% nulls
raise Exception(f"Too many null users: {null_users}")
return {"status": "passed"}
6. Dependency Management
Workflow dependencies:
# workflows/upstream.dig
+produce_data:
td>: queries/create_source_data.sql
create_table: source_data_${session_date_compact}
# workflows/downstream.dig
schedule:
daily>: 04:00:00 # Runs after upstream (3:00)
_export:
requires:
- upstream_workflow # Wait for upstream completion
+consume_data:
td>:
query: |
SELECT * FROM source_data_${session_date_compact}
create_table: processed_data
Manual dependency with polling:
+wait_for_upstream:
sh>: |
for i in {1..60}; do
if td table:show production_db source_data_${session_date_compact}; then
exit 0
fi
sleep 60
done
exit 1
retry: 3
+process_dependent_data:
td>: queries/dependent_processing.sql
7. Backfill Operations
Backfill for date range:
# Run workflow for specific dates
for date in {2024-01-01..2024-01-31}; do
td wf run workflow.dig -p session_date=$date
done
Backfill workflow pattern:
# backfill.dig
+backfill:
loop>:
dates:
- 2024-01-01
- 2024-01-02
- 2024-01-03
# ... more dates
_do:
+process_date:
call>: main_workflow.dig
params:
session_date: ${dates}
8. Workflow Versioning
Best practices for updates:
- Test in development environment first
- Use version comments:
# Version: 2.1.0
# Changes: Added data quality validation
# Date: 2024-01-15
timezone: Asia/Tokyo
- Keep backup of working version:
cp workflow.dig workflow.dig.backup.$(date +%Y%m%d)
- Gradual rollout for critical workflows:
# Run new version in parallel with old version
+new_version:
td>: queries/new_processing.sql
create_table: results_v2
+old_version:
td>: queries/old_processing.sql
create_table: results_v1
+compare_results:
td>:
query: |
SELECT
(SELECT COUNT(*) FROM results_v1) as v1_count,
(SELECT COUNT(*) FROM results_v2) as v2_count
store_last_results: true
9. Resource Optimization
Query resource management:
+large_query:
td>: queries/heavy_processing.sql
# Set query priority (lower = higher priority)
priority: 0
# Set result output size
result_connection: ${td.database}:result_table
# Engine settings
engine: presto
engine_version: stable
Parallel task optimization:
# Limit parallelism to avoid resource exhaustion
+process_many:
for_each>:
batch: ["batch_1", "batch_2", "batch_3", "batch_4", "batch_5"]
_parallel:
limit: 2 # Only run 2 tasks in parallel
_do:
+process_batch:
td>: queries/process_batch.sql
create_table: ${batch}_results
10. Monitoring and Metrics
Collect workflow metrics:
+workflow_start:
py>: scripts.metrics.record_start
workflow: ${workflow_name}
session: ${session_id}
+main_work:
td>: queries/main_query.sql
+workflow_end:
py>: scripts.metrics.record_completion
workflow: ${workflow_name}
session: ${session_id}
duration: ${session_duration}
_error:
+record_failure:
py>: scripts.metrics.record_failure
workflow: ${workflow_name}
session: ${session_id}
Metrics tracking script:
import pytd
from datetime import datetime
def record_start(workflow, session):
client = pytd.Client(database='monitoring')
client.query(f"""
INSERT INTO workflow_metrics
VALUES (
'{workflow}',
'{session}',
{int(datetime.now().timestamp())},
NULL,
'running'
)
""")
def record_completion(workflow, session, duration):
client = pytd.Client(database='monitoring')
client.query(f"""
UPDATE workflow_metrics
SET end_time = {int(datetime.now().timestamp())},
status = 'completed'
WHERE workflow = '{workflow}'
AND session_id = '{session}'
""")
Common Issues and Solutions
Issue: Workflow Runs Too Long
Solutions:
- Break into smaller parallel tasks
- Optimize queries (add time filters, use APPROX functions)
- Use incremental processing instead of full refresh
- Consider Presto instead of Hive for faster execution
- Add indexes if querying external databases
Issue: Frequent Timeouts
Solutions:
+long_running_query:
td>: queries/complex_analysis.sql
timeout: 3600s # Increase timeout to 1 hour
retry: 2
retry_wait: 300s
Issue: Intermittent Failures
Solutions:
+flaky_task:
td>: queries/external_api_call.sql
retry: 5
retry_wait: 60s
retry_wait_multiplier: 2.0 # Exponential backoff
Issue: Data Not Available
Solutions:
+wait_for_data:
sh>: |
# Wait up to 30 minutes for data
for i in {1..30}; do
COUNT=$(td query -d analytics "SELECT COUNT(*) FROM source WHERE date='${session_date}'" -f csv | tail -1)
if [ "$COUNT" -gt 0 ]; then
exit 0
fi
sleep 60
done
exit 1
+process_data:
td>: queries/process.sql
Issue: Out of Memory
Solutions:
- Reduce query complexity
- Add better filters to reduce data volume
- Use sampling for analysis
- Split into multiple smaller queries
- Increase query resources (contact TD admin)
Issue: Duplicate Runs
Solutions:
# Use idempotent operations
+safe_insert:
td>:
query: |
DELETE FROM target_table
WHERE date = '${session_date}';
INSERT INTO target_table
SELECT * FROM source_table
WHERE date = '${session_date}'
Best Practices
- Implement comprehensive error handling for all critical tasks
- Add logging at key workflow stages
- Monitor query performance regularly
- Set up alerts for failures and SLA violations
- Use idempotent operations to handle reruns safely
- Document workflow dependencies clearly
- Implement data quality checks after processing
- Keep workflows modular for easier maintenance
- Version control workflows in git
- Test changes in dev environment first
- Monitor resource usage and optimize
- Set appropriate timeouts and retries
- Use meaningful task names for debugging
- Archive old workflow versions for rollback capability
Maintenance Checklist
Weekly:
- Review failed workflow sessions
- Check query performance trends
- Monitor resource utilization
- Review alert patterns
Monthly:
- Clean up old temporary tables
- Review and optimize slow workflows
- Update documentation
- Review and update dependencies
- Check for deprecated features
Quarterly:
- Performance audit of all workflows
- Review workflow architecture
- Update error handling patterns
- Security review (secrets, access)
Resources
- TD Console: Access workflow logs and monitoring
- Treasure Workflow Quick Start: https://docs.treasuredata.com/articles/#!pd/treasure-workflow-quick-start-using-td-toolbelt-in-a-cli
- td CLI: Command-line workflow management using
td wfcommands - Query performance: Use EXPLAIN for query optimization
- Internal docs: Check TD internal documentation for updates