| name | digdag |
| description | Treasure Workflow (digdag) for TD. Covers .dig syntax, session variables (session_date, session_date_compact), td> operator, _parallel/_retry/_error directives, and tdx wf commands. |
Treasure Workflow (Digdag)
Basic Structure
timezone: Asia/Tokyo
schedule:
daily>: 02:00:00
_export:
td:
database: my_database
engine: presto
+extract:
td>: queries/extract.sql
create_table: raw_data
+transform:
td>: queries/transform.sql
create_table: results
Key points:
.digextension required; filename becomes workflow name- Tasks run sequentially with
+task_name:prefix foo>: baris sugar for_type: fooand_command: bar
Session Variables
| Variable | Example |
|---|---|
${session_time} |
2024-01-30T00:00:00+09:00 |
${session_date} |
2024-01-30 |
${session_date_compact} |
20240130 |
${session_unixtime} |
1706540400 |
${last_session_date} |
Previous scheduled date |
${next_session_date} |
Next scheduled date |
Moment.js available:
+tomorrow:
echo>: ${moment(session_time).add(1, 'days').format("YYYY-MM-DD")}
TD Operator
+query:
td>: queries/analysis.sql
database: analytics
engine: presto
create_table: results # or insert_into: existing_table
Inline SQL:
+inline:
td>:
query: |
SELECT * FROM events
WHERE TD_TIME_RANGE(time, '${session_date}', TD_TIME_ADD('${session_date}', '1d'))
Parallel Execution
+parallel_tasks:
_parallel: true
+task_a:
td>: queries/a.sql
+task_b:
td>: queries/b.sql
+after_parallel:
echo>: "Runs after all parallel tasks"
Limited concurrency:
+limited:
_parallel:
limit: 2
Error Handling
+task:
td>: queries/important.sql
_retry: 3
_error:
+alert:
sh>: python scripts/alert.py "Task failed"
Retry with backoff:
+task:
_retry:
limit: 3
interval: 10
interval_type: exponential # or constant
Variables
_export:
td:
database: production
my_param: value
api_key: ${secret:api_credentials.key} # TD parameter store
+task:
py>: scripts.process.main
param: ${my_param}
Conditional & Loops
+check:
td>: queries/count.sql
store_last_results: true
+if_data:
if>: ${td.last_results.cnt > 0}
_do:
+process:
td>: queries/process.sql
+loop:
for_each>:
region: [US, EU, ASIA]
_do:
+process:
td>: queries/by_region.sql
Event Triggers
# Runs after another workflow succeeds
trigger:
attempt>:
dependent_workflow_name: segment_refresh
dependent_project_name: customer_segments
+activate:
td>: queries/activate.sql
tdx wf Commands
tdx wf push my_workflow # Push to TD
tdx wf run my_project.my_workflow # Run immediately
tdx wf sessions my_workflow # List runs
tdx wf attempt <id> tasks # Show tasks
tdx wf attempt <id> logs +task_name # View logs
tdx wf attempt <id> retry # Retry failed
tdx wf attempt <id> kill # Stop running
Project Structure
my_workflow/
├── my_workflow.dig
├── queries/
│ └── analysis.sql
└── scripts/
└── process.py
Schedule Options
schedule:
daily>: 02:00:00
# hourly>: 00:00
# cron>: "0 */4 * * *"
# weekly>: "Mon,00:00:00"