| name | apache-airflow-orchestration |
| description | Complete guide for Apache Airflow orchestration including DAGs, operators, sensors, XComs, task dependencies, dynamic workflows, and production deployment |
| tags | airflow, workflow, orchestration, dags, operators, sensors, data-pipelines, scheduling |
| tier | tier-1 |
Apache Airflow Orchestration
A comprehensive skill for mastering Apache Airflow workflow orchestration. This skill covers DAG development, operators, sensors, task dependencies, dynamic workflows, XCom communication, scheduling patterns, and production deployment strategies.
When to Use This Skill
Use this skill when:
- Building and managing complex data pipelines with task dependencies
- Orchestrating ETL/ELT workflows across multiple systems
- Scheduling and monitoring batch processing jobs
- Coordinating multi-step data transformations
- Managing workflows with conditional execution and branching
- Implementing event-driven or asset-based workflows
- Deploying production-grade workflow automation
- Creating dynamic workflows that generate tasks programmatically
- Coordinating distributed task execution across clusters
- Building data engineering platforms with workflow orchestration
Core Concepts
What is Apache Airflow?
Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) using Python code, making complex workflow orchestration maintainable and version-controlled.
Key Principles:
- Dynamic: Workflows are defined in Python, enabling dynamic generation
- Extensible: Rich ecosystem of operators, sensors, and hooks
- Scalable: Can scale from single machine to large clusters
- Observable: Comprehensive UI for monitoring and troubleshooting
DAGs (Directed Acyclic Graphs)
A DAG is a collection of tasks organized to reflect their relationships and dependencies.
DAG Properties:
- dag_id: Unique identifier for the DAG
- start_date: When the DAG should start being scheduled
- schedule: How often to run (cron, timedelta, or asset-based)
- catchup: Whether to run missed intervals on DAG activation
- tags: Labels for organization and filtering
- default_args: Default parameters for all tasks in the DAG
DAG Definition Example:
from datetime import datetime
from airflow.sdk import DAG
with DAG(
dag_id="example_dag",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *", # Daily at midnight
catchup=False,
tags=["example", "tutorial"],
) as dag:
# Tasks defined here
pass
Tasks and Operators
Tasks are the basic units of execution in Airflow. Operators are templates for creating tasks.
Common Operator Types:
- BashOperator: Execute bash commands
- PythonOperator: Execute Python functions
- EmailOperator: Send emails
- EmptyOperator: Placeholder/dummy tasks
- Custom Operators: User-defined operators for specific needs
Operator vs. Task:
- Operator: Template/class definition
- Task: Instantiation of an operator with specific parameters
Task Dependencies
Task dependencies define the execution order and workflow structure.
Dependency Operators:
>>: Sets downstream dependency (task1 >> task2)<<: Sets upstream dependency (task2 << task1)chain(): Sequential dependencies for multiple taskscross_downstream(): Many-to-many relationships
Dependency Examples:
# Simple linear flow
task1 >> task2 >> task3
# Fan-out pattern
task1 >> [task2, task3, task4]
# Fan-in pattern
[task1, task2, task3] >> task4
# Complex dependencies
first_task >> [second_task, third_task]
third_task << fourth_task
Executors
Executors determine how and where tasks run.
Executor Types:
- SequentialExecutor: Single-threaded, local (default, not for production)
- LocalExecutor: Multi-threaded, single machine
- CeleryExecutor: Distributed execution using Celery
- KubernetesExecutor: Each task runs in a separate Kubernetes pod
- DaskExecutor: Distributed execution using Dask
Scheduler
The Airflow scheduler:
- Monitors all DAGs and their tasks
- Triggers task instances based on dependencies and schedules
- Submits tasks to executors for execution
- Handles retries and task state management
Starting the Scheduler:
airflow scheduler
DAG Development Patterns
Basic DAG Structure
Every DAG follows this structure:
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
with DAG(
dag_id="basic_dag",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *",
catchup=False,
) as dag:
task1 = BashOperator(
task_id="task1",
bash_command="echo 'Task 1 executed'"
)
task2 = BashOperator(
task_id="task2",
bash_command="echo 'Task 2 executed'"
)
task1 >> task2
Task Dependencies and Chains
Linear Chain:
from airflow.sdk import chain
# These are equivalent:
task1 >> task2 >> task3 >> task4
chain(task1, task2, task3, task4)
Dynamic Chain:
from airflow.sdk import chain
from airflow.operators.empty import EmptyOperator
# Dynamically generate and chain tasks
chain(*[EmptyOperator(task_id=f"task_{i}") for i in range(1, 6)])
Pairwise Chain:
from airflow.sdk import chain
# Creates paired dependencies:
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
Cross Downstream:
from airflow.sdk import cross_downstream
# Both op1 and op2 feed into both op3 and op4
cross_downstream([op1, op2], [op3, op4])
Branching and Conditional Execution
BranchPythonOperator:
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
if context['data_interval_start'].day == 1:
return 'monthly_task'
return 'daily_task'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch
)
daily_task = BashOperator(task_id='daily_task', bash_command='echo daily')
monthly_task = BashOperator(task_id='monthly_task', bash_command='echo monthly')
branch >> [daily_task, monthly_task]
Custom Branch Operator:
from airflow.operators.branch import BaseBranchOperator
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
Run extra branch on first day of month
"""
if context['data_interval_start'].day == 1:
return ['daily_task_id', 'monthly_task_id']
elif context['data_interval_start'].day == 2:
return 'daily_task_id'
else:
return None # Skip all downstream tasks
TaskGroups for Organization
TaskGroups help organize related tasks hierarchically:
from airflow.sdk import task_group
from airflow.operators.empty import EmptyOperator
@task_group()
def data_processing_group():
extract = EmptyOperator(task_id="extract")
transform = EmptyOperator(task_id="transform")
load = EmptyOperator(task_id="load")
extract >> transform >> load
@task_group()
def validation_group():
validate_schema = EmptyOperator(task_id="validate_schema")
validate_data = EmptyOperator(task_id="validate_data")
validate_schema >> validate_data
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
start >> data_processing_group() >> validation_group() >> end
Edge Labeling
Add labels to dependency edges for clarity:
from airflow.sdk import Label
# Inline labeling
my_task >> Label("When empty") >> other_task
# Method-based labeling
my_task.set_downstream(other_task, Label("When empty"))
LatestOnlyOperator
Skip tasks if not the latest DAG run:
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
import pendulum
with DAG(
dag_id='latest_only_example',
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=True,
schedule="@daily",
) as dag:
latest_only = LatestOnlyOperator(task_id='latest_only')
task1 = EmptyOperator(task_id='task1')
task2 = EmptyOperator(task_id='task2')
task3 = EmptyOperator(task_id='task3')
task4 = EmptyOperator(task_id='task4', trigger_rule='all_done')
latest_only >> task1 >> task3
latest_only >> task4
task2 >> task3
task2 >> task4
Operators Deep Dive
BashOperator
Execute bash commands:
from airflow.providers.standard.operators.bash import BashOperator
bash_task = BashOperator(
task_id="bash_example",
bash_command="echo 'Hello from Bash'; date",
env={'MY_VAR': 'value'}, # Environment variables
append_env=True, # Append to existing env vars
output_encoding='utf-8'
)
Complex Bash Command:
bash_complex = BashOperator(
task_id="complex_bash",
bash_command="""
cd /path/to/dir
python process_data.py --input {{ ds }} --output {{ tomorrow_ds }}
if [ $? -eq 0 ]; then
echo "Success"
else
echo "Failed" && exit 1
fi
""",
)
PythonOperator
Execute Python functions:
from airflow.providers.standard.operators.python import PythonOperator
def my_python_function(name, **context):
print(f"Hello {name}!")
print(f"Execution date: {context['ds']}")
return "Success"
python_task = PythonOperator(
task_id="python_example",
python_callable=my_python_function,
op_kwargs={'name': 'Airflow'},
provide_context=True
)
Traditional ETL with PythonOperator:
import json
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
def transform(ti):
# Pull from XCom
order_data_dict = ti.xcom_pull(task_ids="extract")
total_order_value = sum(order_data_dict.values())
return {"total_order_value": total_order_value}
def load(ti):
# Pull from XCom
total = ti.xcom_pull(task_ids="transform")["total_order_value"]
print(f"Total order value is: {total:.2f}")
with DAG(
dag_id="legacy_etl_pipeline",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
extract_task = PythonOperator(task_id="extract", python_callable=extract)
transform_task = PythonOperator(task_id="transform", python_callable=transform)
load_task = PythonOperator(task_id="load", python_callable=load)
extract_task >> transform_task >> load_task
EmailOperator
Send email notifications:
from airflow.providers.smtp.operators.smtp import EmailOperator
email_task = EmailOperator(
task_id='send_email',
to='recipient@example.com',
subject='Airflow Notification',
html_content='<h3>Task completed successfully!</h3>',
cc=['cc@example.com'],
bcc=['bcc@example.com']
)
EmptyOperator
Placeholder for workflow structure:
from airflow.operators.empty import EmptyOperator
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')
# Useful for organizing complex DAGs
start >> [task1, task2, task3] >> end
Custom Operators
Create reusable custom operators:
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyCustomOperator(BaseOperator):
@apply_defaults
def __init__(self, my_param, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_param = my_param
def execute(self, context):
self.log.info(f"Executing with param: {self.my_param}")
# Custom logic here
return "Result"
# Usage
custom_task = MyCustomOperator(
task_id="custom",
my_param="value"
)
Sensors Deep Dive
Sensors are a special type of operator that wait for a certain condition to be met before proceeding.
ExternalTaskSensor
Wait for tasks in other DAGs:
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
import pendulum
with DAG(
dag_id="example_external_task_sensor",
start_date=pendulum.datetime(2021, 10, 20, tz="UTC"),
catchup=False,
schedule=None,
) as dag:
wait_for_task = ExternalTaskSensor(
task_id="wait_for_task",
external_dag_id="upstream_dag",
external_task_id="upstream_task",
allowed_states=["success"],
failed_states=["failed"],
execution_delta=None, # Same execution_date
timeout=600, # 10 minutes
poke_interval=60, # Check every 60 seconds
)
Deferrable ExternalTaskSensor:
# More efficient - releases worker slot while waiting
wait_for_task_async = ExternalTaskSensor(
task_id="wait_for_task_async",
external_dag_id="upstream_dag",
external_task_id="upstream_task",
allowed_states=["success"],
failed_states=["failed"],
deferrable=True, # Use async mode
)
FileSensor
Wait for files to appear:
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/path/to/file.csv",
poke_interval=30,
timeout=600,
mode='poke' # or 'reschedule' for long waits
)
TimeDeltaSensor
Wait for a specific time period:
from datetime import timedelta
from airflow.sensors.time_delta import TimeDeltaSensor
wait_one_hour = TimeDeltaSensor(
task_id="wait_one_hour",
delta=timedelta(hours=1)
)
BigQuery Table Sensor
Wait for BigQuery table to exist:
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
import pendulum
with DAG(
dag_id="bigquery_sensor_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
) as dag:
wait_for_table = BigQueryTableExistenceSensor(
task_id="wait_for_table",
project_id="your-project-id",
dataset_id="your_dataset",
table_id="your_table",
bigquery_conn_id="google_cloud_default",
location="US",
poke_interval=60,
timeout=3600,
)
Custom Sensors
Create custom sensors for specific conditions:
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MyCustomSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, my_condition, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_condition = my_condition
def poke(self, context):
# Return True when condition is met
self.log.info(f"Checking condition: {self.my_condition}")
# Custom logic to check condition
return check_condition(self.my_condition)
Deferrable Sensors
Deferrable sensors release worker slots while waiting:
from datetime import timedelta
from airflow.sdk import BaseSensorOperator, StartTriggerArgs
class WaitHoursSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
start_from_trigger = True
def __init__(self, *args, trigger_kwargs=None, start_from_trigger=True, **kwargs):
super().__init__(*args, **kwargs)
if trigger_kwargs:
self.start_trigger_args.trigger_kwargs = trigger_kwargs
self.start_from_trigger = start_from_trigger
def execute_complete(self, context, event=None):
return # Task complete
XComs (Cross-Communication)
XComs enable task-to-task communication by storing and retrieving data.
Basic XCom Usage
Pushing to XCom:
def push_function(**context):
value = "Important data"
context['ti'].xcom_push(key='my_key', value=value)
# Or simply return (uses 'return_value' key)
return value
push_task = PythonOperator(
task_id='push',
python_callable=push_function,
provide_context=True
)
Pulling from XCom:
def pull_function(**context):
# Pull by task_id (uses 'return_value' key)
value = context['ti'].xcom_pull(task_ids='push')
# Pull with specific key
value = context['ti'].xcom_pull(task_ids='push', key='my_key')
print(f"Pulled value: {value}")
pull_task = PythonOperator(
task_id='pull',
python_callable=pull_function,
provide_context=True
)
XCom with TaskFlow API
TaskFlow API automatically manages XComs:
from airflow.decorators import task
@task
def extract():
return {"data": [1, 2, 3, 4, 5]}
@task
def transform(data_dict):
# Automatically receives XCom from extract
total = sum(data_dict['data'])
return {"total": total}
@task
def load(summary):
print(f"Total: {summary['total']}")
# Automatic XCom handling
data = extract()
summary = transform(data)
load(summary)
XCom Best Practices
Size Limitations:
- XComs are stored in the metadata database
- Keep XCom data small (< 1MB recommended)
- For large data, store in external systems and pass references
Example with External Storage:
@task
def process_large_data():
# Process data
large_result = compute_large_dataset()
# Store in S3/GCS
file_path = save_to_s3(large_result, "s3://bucket/result.parquet")
# Return only the path
return {"result_path": file_path}
@task
def consume_large_data(metadata):
# Load from S3/GCS
data = load_from_s3(metadata['result_path'])
process(data)
XCom with Operators
Reading XCom in Templates:
from airflow.providers.standard.operators.bash import BashOperator
process_file = BashOperator(
task_id="process",
bash_command="python process.py {{ ti.xcom_pull(task_ids='extract') }}",
)
XCom with EmailOperator:
from airflow.sdk import task
from airflow.providers.smtp.operators.smtp import EmailOperator
@task
def get_ip():
return "192.168.1.1"
@task(multiple_outputs=True)
def compose_email(external_ip):
return {
'subject': f'Server connected from {external_ip}',
'body': f'Your server is connected from {external_ip}<br>'
}
email_info = compose_email(get_ip())
EmailOperator(
task_id='send_email',
to='example@example.com',
subject=email_info['subject'],
html_content=email_info['body']
)
Dynamic Workflows
Create tasks dynamically based on runtime conditions or external data.
Dynamic Task Generation with Loops
from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator
with DAG("dynamic_loop_example", ...) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
# Dynamically create tasks
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
task = EmptyOperator(task_id=option)
start >> task >> end
Dynamic Task Mapping
Map over task outputs to create dynamic parallel tasks:
from airflow.decorators import task
@task
def extract():
# Returns list of items to process
return [1, 2, 3, 4, 5]
@task
def transform(item):
# Processes single item
return item * 2
@task
def load(items):
# Receives all transformed items
print(f"Loaded {len(items)} items: {items}")
# Dynamic mapping
data = extract()
transformed = transform.expand(item=data) # Creates 5 parallel tasks
load(transformed)
Mapping with Classic Operators:
from airflow.operators.bash import BashOperator
class ExtractOperator(BaseOperator):
def execute(self, context):
return ["file1.csv", "file2.csv", "file3.csv"]
class TransformOperator(BaseOperator):
def __init__(self, input, **kwargs):
super().__init__(**kwargs)
self.input = input
def execute(self, context):
# Process single file
return f"processed_{self.input}"
extract = ExtractOperator(task_id="extract")
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)
Task Group Mapping
Map over entire task groups:
from airflow.decorators import task, task_group
@task
def add_one(value):
return value + 1
@task
def double(value):
return value * 2
@task_group
def process_group(value):
incremented = add_one(value)
return double(incremented)
@task
def aggregate(results):
print(f"Results: {results}")
# Map task group over values
results = process_group.expand(value=[1, 2, 3, 4, 5])
aggregate(results)
Partial Parameters with Mapping
Mix static and dynamic parameters:
@task
def process(base_path, filename):
full_path = f"{base_path}/{filename}"
return f"Processed {full_path}"
# Static parameter 'base_path', dynamic 'filename'
results = process.partial(base_path="/data").expand(
filename=["file1.csv", "file2.csv", "file3.csv"]
)
TaskFlow API
The modern way to write Airflow DAGs with automatic XCom handling and cleaner syntax.
Basic TaskFlow Example
from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="taskflow_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
schedule=None,
catchup=False,
)
def my_taskflow_dag():
@task
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
import json
return json.loads(data_string)
@task
def transform(order_data_dict):
total = sum(order_data_dict.values())
return {"total_order_value": total}
@task
def load(summary):
print(f"Total order value: {summary['total_order_value']:.2f}")
# Function calls create task dependencies automatically
order_data = extract()
summary = transform(order_data)
load(summary)
# Instantiate the DAG
my_taskflow_dag()
Multiple Outputs
Return multiple values from tasks:
@task(multiple_outputs=True)
def extract_data():
return {
'orders': [1, 2, 3],
'customers': ['A', 'B', 'C'],
'revenue': 1000.50
}
@task
def process_orders(orders):
print(f"Processing {len(orders)} orders")
@task
def process_customers(customers):
print(f"Processing {len(customers)} customers")
# Access individual outputs
data = extract_data()
process_orders(data['orders'])
process_customers(data['customers'])
Mixing TaskFlow with Traditional Operators
from airflow.decorators import dag, task
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
@dag(
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
schedule=None,
)
def mixed_dag():
@task
def get_date():
from datetime import datetime
return datetime.now().strftime("%Y%m%d")
# Traditional operator
bash_task = BashOperator(
task_id="print_date",
bash_command="echo Processing data for {{ ti.xcom_pull(task_ids='get_date') }}"
)
@task
def process_results():
print("Processing complete")
# Mix task types
date = get_date()
date >> bash_task >> process_results()
mixed_dag()
Virtual Environment for Tasks
Isolate task dependencies:
from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="virtualenv_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
)
def virtualenv_dag():
@task.virtualenv(
requirements=["pandas==1.5.0", "numpy==1.23.0"],
system_site_packages=False
)
def analyze_data():
import pandas as pd
import numpy as np
df = pd.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]})
result = np.mean(df['col1'])
return float(result)
@task
def report_results(mean_value):
print(f"Mean value: {mean_value}")
result = analyze_data()
report_results(result)
virtualenv_dag()
Asset-Based Scheduling
Schedule DAGs based on data assets (formerly datasets) rather than time.
Producer-Consumer Pattern
from airflow.sdk import DAG, Asset
from airflow.operators.bash import BashOperator
from datetime import datetime
# Define asset
customer_data = Asset("s3://my-bucket/customers.parquet")
# Producer DAG
with DAG(
dag_id="producer_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
) as producer:
BashOperator(
task_id="generate_data",
bash_command="python generate_customers.py",
outlets=[customer_data] # Marks asset as updated
)
# Consumer DAG - triggered when asset updates
with DAG(
dag_id="consumer_dag",
schedule=[customer_data], # Triggered by asset
start_date=datetime(2023, 1, 1),
catchup=False,
) as consumer:
BashOperator(
task_id="process_data",
bash_command="python process_customers.py"
)
Multiple Asset Dependencies
AND Logic (all assets must update):
from airflow.datasets import Dataset
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
with DAG(
dag_id="wait_for_both",
schedule=[asset_1 & asset_2], # Both must update
start_date=datetime(2023, 1, 1),
):
pass
OR Logic (any asset update triggers):
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
with DAG(
dag_id="triggered_by_either",
schedule=[asset_1 | asset_2], # Either can trigger
start_date=datetime(2023, 1, 1),
):
pass
Complex Logic:
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
asset_3 = Dataset("s3://bucket/file3.csv")
with DAG(
dag_id="complex_condition",
schedule=(asset_1 | (asset_2 & asset_3)), # asset_1 OR (asset_2 AND asset_3)
start_date=datetime(2023, 1, 1),
):
pass
Asset Aliases
Use aliases for flexible asset references:
from airflow.datasets import Dataset, AssetAlias
from airflow.decorators import task
# Producer with alias
with DAG(dag_id="alias_producer", start_date=datetime(2023, 1, 1)):
@task(outlets=[AssetAlias("my-alias")])
def produce_data(*, outlet_events):
# Dynamically add actual asset
outlet_events[AssetAlias("my-alias")].add(
Dataset("s3://bucket/my-file.csv")
)
# Consumer depending on alias
with DAG(
dag_id="alias_consumer",
schedule=AssetAlias("my-alias"),
start_date=datetime(2023, 1, 1),
):
pass
Accessing Asset Event Information
@task
def process_asset_data(*, triggering_asset_events):
for event in triggering_asset_events:
print(f"Asset: {event.asset.uri}")
print(f"Timestamp: {event.timestamp}")
print(f"Extra: {event.extra}")
Scheduling Patterns
Cron Expressions
# Every day at midnight
schedule="0 0 * * *"
# Every Monday at 9 AM
schedule="0 9 * * 1"
# Every 15 minutes
schedule="*/15 * * * *"
# First day of month at noon
schedule="0 12 1 * *"
# Weekdays at 6 PM
schedule="0 18 * * 1-5"
Timedelta Scheduling
from datetime import timedelta
with DAG(
dag_id="timedelta_schedule",
start_date=datetime(2023, 1, 1),
schedule=timedelta(hours=6), # Every 6 hours
):
pass
Preset Schedules
# Common presets
schedule="@once" # Run once
schedule="@hourly" # Every hour
schedule="@daily" # Daily at midnight
schedule="@weekly" # Every Sunday at midnight
schedule="@monthly" # First day of month at midnight
schedule="@yearly" # January 1st at midnight
schedule=None # Manual trigger only
Catchup and Backfilling
Catchup:
with DAG(
dag_id="catchup_example",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=True, # Run all missed intervals
):
pass
with DAG(
dag_id="no_catchup",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=False, # Only run latest interval
):
pass
Manual Backfilling:
# Backfill specific date range
airflow dags backfill \
--start-date 2023-01-01 \
--end-date 2023-01-31 \
my_dag_id
# Backfill with marking success (no execution)
airflow dags backfill \
--start-date 2023-01-01 \
--end-date 2023-01-31 \
--mark-success \
my_dag_id
Production Patterns
Error Handling and Retries
Task-Level Retries:
from airflow.operators.bash import BashOperator
from datetime import timedelta
task_with_retry = BashOperator(
task_id="retry_task",
bash_command="python might_fail.py",
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
)
DAG-Level Default Args:
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id="production_dag",
default_args=default_args,
start_date=datetime(2023, 1, 1),
schedule="@daily",
):
pass
Task Concurrency Control
Per-Task Concurrency:
from airflow.operators.bash import BashOperator
from datetime import timedelta
# Limit concurrent instances of this task
limited_task = BashOperator(
task_id="limited_task",
bash_command="echo 'Processing'",
max_active_tis_per_dag=3 # Max 3 instances running
)
DAG-Level Concurrency:
with DAG(
dag_id="concurrent_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
max_active_runs=5, # Max 5 DAG runs simultaneously
concurrency=10, # Max 10 task instances across all runs
):
pass
Idempotency
Make tasks idempotent for safe retries:
@task
def idempotent_load(**context):
execution_date = context['ds']
# Delete existing data for this date first
delete_query = f"""
DELETE FROM target_table
WHERE date = '{execution_date}'
"""
execute_sql(delete_query)
# Insert new data
insert_query = f"""
INSERT INTO target_table
SELECT * FROM source
WHERE date = '{execution_date}'
"""
execute_sql(insert_query)
SLAs and Alerts
from datetime import timedelta
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
print(f"SLA missed for {task_list}")
# Send alert to monitoring system
with DAG(
dag_id="sla_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
default_args={
'sla': timedelta(hours=2), # Task should complete in 2 hours
},
sla_miss_callback=sla_miss_callback,
):
pass
Task Callbacks
def on_failure_callback(context):
print(f"Task {context['task_instance'].task_id} failed")
# Send to Slack, PagerDuty, etc.
def on_success_callback(context):
print(f"Task {context['task_instance'].task_id} succeeded")
def on_retry_callback(context):
print(f"Task {context['task_instance'].task_id} retrying")
task_with_callbacks = BashOperator(
task_id="monitored_task",
bash_command="python my_script.py",
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
on_retry_callback=on_retry_callback,
)
Docker Deployment
Docker Compose for Local Development:
version: '3'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
webserver:
image: apache/airflow:2.7.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
ports:
- "8080:8080"
command: webserver
scheduler:
image: apache/airflow:2.7.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
command: scheduler
Kubernetes Executor
KubernetesExecutor Configuration:
# In airflow.cfg
[kubernetes]
namespace = airflow
worker_container_repository = my-registry/airflow
worker_container_tag = 2.7.0
delete_worker_pods = True
delete_worker_pods_on_failure = False
[core]
executor = KubernetesExecutor
Pod Override for Specific Task:
from kubernetes.client import models as k8s
task_with_gpu = BashOperator(
task_id="gpu_task",
bash_command="python train_model.py",
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
limits={"nvidia.com/gpu": "1"}
)
)
]
)
)
}
)
Monitoring and Logging
Structured Logging:
from airflow.decorators import task
import logging
@task
def monitored_task():
logger = logging.getLogger(__name__)
logger.info("Starting data processing", extra={
'process_id': 'abc123',
'record_count': 1000
})
try:
process_data()
logger.info("Processing complete")
except Exception as e:
logger.error(f"Processing failed: {str(e)}", extra={
'error_type': type(e).__name__
})
raise
StatsD Metrics:
from airflow.stats import Stats
@task
def task_with_metrics():
Stats.incr('my_dag.task_started')
start_time = time.time()
process_data()
duration = time.time() - start_time
Stats.timing('my_dag.task_duration', duration)
Stats.incr('my_dag.task_completed')
Best Practices
DAG Design
- Keep DAGs Simple: Break complex workflows into multiple DAGs
- Use Descriptive Names: dag_id and task_id should be self-explanatory
- Idempotent Tasks: Tasks should produce same result when re-run
- Small XComs: Keep XCom data under 1MB
- External Storage: Use S3/GCS for large data, pass references
- Proper Dependencies: Model true dependencies, avoid unnecessary ones
- Error Handling: Use retries, callbacks, and proper error logging
- Resource Management: Set appropriate task concurrency limits
Code Organization
dags/
├── common/
│ ├── __init__.py
│ ├── operators.py # Custom operators
│ ├── sensors.py # Custom sensors
│ └── utils.py # Utility functions
├── etl/
│ ├── customer_pipeline.py
│ ├── order_pipeline.py
│ └── product_pipeline.py
├── ml/
│ ├── training_dag.py
│ └── inference_dag.py
└── maintenance/
├── cleanup_dag.py
└── backup_dag.py
Testing DAGs
Unit Testing:
import pytest
from airflow.models import DagBag
def test_dag_loaded():
dagbag = DagBag(dag_folder='dags/', include_examples=False)
assert len(dagbag.import_errors) == 0
def test_task_count():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
assert len(dag.tasks) == 5
def test_task_dependencies():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
extract = dag.get_task('extract')
transform = dag.get_task('transform')
assert transform in extract.downstream_list
Integration Testing:
from airflow.models import DagBag
from airflow.utils.state import State
def test_dag_runs():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
# Test DAG run
dag_run = dag.create_dagrun(
state=State.RUNNING,
execution_date=datetime(2023, 1, 1),
run_type='manual'
)
# Run specific task
task_instance = dag_run.get_task_instance('extract')
task_instance.run()
assert task_instance.state == State.SUCCESS
Performance Optimization
- Use Deferrable Operators: For sensors and long-running waits
- Dynamic Task Mapping: For parallel processing
- Appropriate Executor: Choose based on scale (Local, Celery, Kubernetes)
- Connection Pooling: Reuse database connections
- Task Parallelism: Set max_active_runs and concurrency appropriately
- Lazy Loading: Don't execute heavy logic at DAG parse time
- External Storage: Keep metadata database light
Security
- Secrets Management: Use Airflow Secrets Backend (not hardcoded)
- Connection Encryption: Use encrypted connections for databases
- RBAC: Enable role-based access control
- Audit Logging: Enable audit logs for compliance
- Network Isolation: Restrict worker network access
- Credential Rotation: Regularly rotate credentials
Configuration Management
# Use Variables for configuration
from airflow.models import Variable
config = Variable.get("my_config", deserialize_json=True)
api_key = Variable.get("api_key")
# Use Connections for external services
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('my_postgres')
db_url = f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
Common Patterns and Examples
See EXAMPLES.md for 18+ detailed real-world examples including:
- ETL pipelines
- Machine learning workflows
- Data quality checks
- Multi-cloud orchestration
- Event-driven architectures
- Complex branching logic
- Dynamic task generation
- Asset-based scheduling
- Sensor patterns
- Error handling strategies
Troubleshooting
DAG Not Appearing in UI
- Check for Python syntax errors in DAG file
- Verify DAG file is in correct directory
- Check dag_id is unique
- Ensure schedule is not None if you expect it to run
- Check scheduler logs for import errors
Tasks Not Running
- Check task dependencies are correct
- Verify upstream tasks succeeded
- Check task concurrency limits
- Ensure executor has available slots
- Review task logs for errors
Performance Issues
- Reduce DAG complexity (break into multiple DAGs)
- Optimize SQL queries in tasks
- Use appropriate executor for scale
- Enable task parallelism
- Check for slow sensors (use deferrable mode)
- Monitor metadata database performance
Common Errors
Import Errors:
# Bad - imports at DAG level slow parsing
from heavy_library import process
with DAG(...):
pass
# Good - imports inside tasks
with DAG(...):
@task
def my_task():
from heavy_library import process
process()
Circular Dependencies:
# This will fail
task1 >> task2 >> task3 >> task1 # Circular!
# Must be acyclic
task1 >> task2 >> task3
Large XComs:
# Bad - storing large data in XCom
@task
def process():
large_df = pd.read_csv('big_file.csv')
return large_df # Too large!
# Good - store reference
@task
def process():
large_df = pd.read_csv('big_file.csv')
path = save_to_s3(large_df)
return path # Just the path
Resources
- Official Documentation: https://airflow.apache.org/docs/
- Airflow GitHub: https://github.com/apache/airflow
- Astronomer Guides: https://docs.astronomer.io/learn
- Community Slack: https://apache-airflow.slack.com
- Stack Overflow: Tag
apache-airflow - Awesome Airflow: https://github.com/jghoman/awesome-apache-airflow
Skill Version: 1.0.0 Last Updated: January 2025 Apache Airflow Version: 2.7+ Skill Category: Data Engineering, Workflow Orchestration, Pipeline Management