| name | airflow-workflows |
| description | Apache Airflow DAG design, operators, and scheduling best practices. |
Airflow Workflows
DAG Structure
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_etl',
default_args=default_args,
description='Daily ETL pipeline',
schedule_interval='0 6 * * *', # 6 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'daily'],
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_function,
)
transform = SQLExecuteQueryOperator(
task_id='transform_data',
conn_id='warehouse',
sql='sql/transform.sql',
)
load = PythonOperator(
task_id='load_data',
python_callable=load_function,
)
extract >> transform >> load
Common Operators
| Operator |
Use Case |
PythonOperator |
Custom Python code |
BashOperator |
Shell commands |
SQLExecuteQueryOperator |
Database queries |
S3ToSnowflakeOperator |
Cloud data transfers |
DbtCloudRunJobOperator |
dbt Cloud jobs |
Best Practices
- Idempotent tasks - Safe to re-run
- Small tasks - Easy to debug, retry
- XCom sparingly - Only small data
- Templating - Use
{{ ds }} for dates
- Sensors wisely - Avoid blocking workers
Task Dependencies
# Linear
task1 >> task2 >> task3
# Parallel
[task1, task2] >> task3
# Complex
task1 >> [task2, task3]
[task2, task3] >> task4
Dynamic DAGs
for table in ['users', 'orders', 'products']:
task = PythonOperator(
task_id=f'process_{table}',
python_callable=process_table,
op_kwargs={'table': table},
)