Claude Code Plugins

Community-maintained marketplace

Feedback

airflow-workflows

@timequity/vibe-coder
0
0

Apache Airflow DAG design, operators, and scheduling best practices.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name airflow-workflows
description Apache Airflow DAG design, operators, and scheduling best practices.

Airflow Workflows

DAG Structure

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'daily_etl',
    default_args=default_args,
    description='Daily ETL pipeline',
    schedule_interval='0 6 * * *',  # 6 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'daily'],
) as dag:

    extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_function,
    )

    transform = SQLExecuteQueryOperator(
        task_id='transform_data',
        conn_id='warehouse',
        sql='sql/transform.sql',
    )

    load = PythonOperator(
        task_id='load_data',
        python_callable=load_function,
    )

    extract >> transform >> load

Common Operators

Operator Use Case
PythonOperator Custom Python code
BashOperator Shell commands
SQLExecuteQueryOperator Database queries
S3ToSnowflakeOperator Cloud data transfers
DbtCloudRunJobOperator dbt Cloud jobs

Best Practices

  1. Idempotent tasks - Safe to re-run
  2. Small tasks - Easy to debug, retry
  3. XCom sparingly - Only small data
  4. Templating - Use {{ ds }} for dates
  5. Sensors wisely - Avoid blocking workers

Task Dependencies

# Linear
task1 >> task2 >> task3

# Parallel
[task1, task2] >> task3

# Complex
task1 >> [task2, task3]
[task2, task3] >> task4

Dynamic DAGs

for table in ['users', 'orders', 'products']:
    task = PythonOperator(
        task_id=f'process_{table}',
        python_callable=process_table,
        op_kwargs={'table': table},
    )